monitor_recording.py
7.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 10:27:06
录音过程监控脚本
实时监控录音缓存文件和识别结果
"""
import os
import sys
import time
import json
import wave
from datetime import datetime
from pathlib import Path
def monitor_cache_files():
"""监控缓存文件变化"""
cache_dir = "cache_data"
if not os.path.exists(cache_dir):
os.makedirs(cache_dir, exist_ok=True)
print(f"📁 监控目录: {os.path.abspath(cache_dir)}")
print(f"⏰ 开始时间: {datetime.now().strftime('%H:%M:%S')}")
print("=" * 60)
last_files = set()
file_info = {} # 存储文件信息
while True:
try:
current_files = set(os.listdir(cache_dir)) if os.path.exists(cache_dir) else set()
# 检查新增文件
new_files = current_files - last_files
for file in new_files:
file_path = os.path.join(cache_dir, file)
if os.path.isfile(file_path):
stat = os.stat(file_path)
file_info[file] = {
'created': datetime.fromtimestamp(stat.st_ctime),
'size': stat.st_size,
'path': file_path
}
print(f"🎵 新音频文件: {file}")
print(f" 📊 大小: {stat.st_size:,} bytes")
print(f" ⏰ 创建: {datetime.fromtimestamp(stat.st_ctime).strftime('%H:%M:%S.%f')[:-3]}")
# 分析WAV文件
try:
with wave.open(file_path, 'rb') as wf:
duration = wf.getnframes() / wf.getframerate()
print(f" 🎼 时长: {duration:.2f}秒, 采样率: {wf.getframerate()}Hz")
except Exception as e:
print(f" ❌ WAV分析失败: {e}")
print()
# 检查删除的文件
deleted_files = last_files - current_files
for file in deleted_files:
if file in file_info:
info = file_info[file]
existed_time = (datetime.now() - info['created']).total_seconds()
print(f"🗑️ 文件删除: {file}")
print(f" ⏱️ 存在时长: {existed_time:.2f}秒")
print(f" 📊 文件大小: {info['size']:,} bytes")
print()
del file_info[file]
last_files = current_files
# 显示当前状态
if len(current_files) > 0:
print(f"\r📂 当前文件数: {len(current_files)} | 时间: {datetime.now().strftime('%H:%M:%S')}", end="", flush=True)
else:
print(f"\r⭕ 无缓存文件 | 时间: {datetime.now().strftime('%H:%M:%S')}", end="", flush=True)
time.sleep(0.5)
except KeyboardInterrupt:
print("\n\n🛑 用户中断监控")
break
except Exception as e:
print(f"\n❌ 监控出错: {e}")
time.sleep(1)
def monitor_recognition_results():
"""监控识别结果日志"""
log_file = "recognition_results.log"
print(f"📝 监控识别结果: {os.path.abspath(log_file)}")
if not os.path.exists(log_file):
# 创建空日志文件
with open(log_file, 'w', encoding='utf-8') as f:
f.write(f"# 识别结果日志 - {datetime.now()}\n")
# 获取文件初始大小
last_size = os.path.getsize(log_file)
while True:
try:
current_size = os.path.getsize(log_file)
if current_size > last_size:
# 文件有新内容
with open(log_file, 'r', encoding='utf-8') as f:
f.seek(last_size)
new_content = f.read()
if new_content.strip():
lines = new_content.strip().split('\n')
for line in lines:
if line.strip() and not line.startswith('#'):
print(f"\n🎯 识别结果: {line}")
last_size = current_size
time.sleep(0.5)
except KeyboardInterrupt:
break
except Exception as e:
print(f"\n❌ 日志监控出错: {e}")
time.sleep(1)
def analyze_existing_files():
"""分析现有的缓存文件"""
cache_dir = "cache_data"
print("🔍 分析现有缓存文件")
print("=" * 40)
if not os.path.exists(cache_dir):
print("❌ 缓存目录不存在")
return
files = [f for f in os.listdir(cache_dir) if f.endswith('.wav')]
if not files:
print("📭 缓存目录为空")
return
print(f"📁 找到 {len(files)} 个音频文件:")
for file in sorted(files):
file_path = os.path.join(cache_dir, file)
stat = os.stat(file_path)
print(f"\n🎵 {file}")
print(f" 📊 大小: {stat.st_size:,} bytes")
print(f" 📅 创建: {datetime.fromtimestamp(stat.st_ctime)}")
print(f" 📝 修改: {datetime.fromtimestamp(stat.st_mtime)}")
# 分析WAV文件
try:
with wave.open(file_path, 'rb') as wf:
channels = wf.getnchannels()
sample_width = wf.getsampwidth()
framerate = wf.getframerate()
frames = wf.getnframes()
duration = frames / framerate
print(f" 🎼 格式: {channels}ch, {sample_width*8}bit, {framerate}Hz")
print(f" ⏱️ 时长: {duration:.2f}秒 ({frames:,} 帧)")
# 计算文件年龄
age = (datetime.now() - datetime.fromtimestamp(stat.st_mtime)).total_seconds()
print(f" 🕐 文件年龄: {age:.1f}秒")
except Exception as e:
print(f" ❌ WAV分析失败: {e}")
print()
def show_system_info():
"""显示系统信息"""
print("💻 系统信息")
print("=" * 30)
print(f"🐍 Python: {sys.version.split()[0]}")
print(f"📂 工作目录: {os.getcwd()}")
print(f"⏰ 当前时间: {datetime.now()}")
# 检查关键目录
dirs_to_check = ['cache_data', 'logs', 'audio', 'temp']
print(f"\n📁 目录状态:")
for dir_name in dirs_to_check:
exists = os.path.exists(dir_name)
if exists:
file_count = len([f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))])
print(f" ✅ {dir_name}: 存在 ({file_count} 个文件)")
else:
print(f" ❌ {dir_name}: 不存在")
print()
def main():
"""主函数"""
print("🎙️ 录音过程监控工具")
print("=" * 50)
show_system_info()
analyze_existing_files()
print("🚀 开始实时监控...")
print("💡 提示: 在浏览器中访问 http://localhost:5050/voice_test_sync.html 进行录音测试")
print("⌨️ 按 Ctrl+C 停止监控")
print("=" * 50)
try:
# 启动监控
monitor_cache_files()
except KeyboardInterrupt:
print("\n\n👋 监控已停止")
except Exception as e:
print(f"\n❌ 监控出错: {e}")
print("\n📊 最终分析:")
analyze_existing_files()
print("\n✅ 监控完成")
if __name__ == "__main__":
main()