test_voice_segmentation_fix.py
9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# AIfeng/2025-01-02 16:03:47
# 语音分割修复验证测试工具
# 用于对比原版和改进版录音器的语音分割效果
import os
import time
import shutil
from datetime import datetime
from recorder_sync import RecorderSync
from recorder_sync_improved import RecorderSyncImproved
from logger import get_logger
logger = get_logger("VoiceSegmentationTest")
class VoiceSegmentationTester:
def __init__(self):
self.cache_dir = "cache_data"
self.backup_dir = "cache_data_backup"
self.test_results = []
def clear_cache(self):
"""清空缓存目录"""
if os.path.exists(self.cache_dir):
for file in os.listdir(self.cache_dir):
if file.endswith('.wav'):
os.remove(os.path.join(self.cache_dir, file))
logger.info("缓存目录已清空")
def count_wav_files(self):
"""统计WAV文件数量"""
if not os.path.exists(self.cache_dir):
return 0
return len([f for f in os.listdir(self.cache_dir) if f.endswith('.wav')])
def backup_files(self, test_name):
"""备份测试文件"""
if not os.path.exists(self.cache_dir):
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = os.path.join(self.backup_dir, f"{test_name}_{timestamp}")
if not os.path.exists(self.backup_dir):
os.makedirs(self.backup_dir)
if not os.path.exists(backup_path):
os.makedirs(backup_path)
for file in os.listdir(self.cache_dir):
if file.endswith('.wav'):
shutil.copy2(
os.path.join(self.cache_dir, file),
os.path.join(backup_path, file)
)
logger.info(f"文件已备份到: {backup_path}")
return backup_path
def create_status_callback(self, test_name):
"""创建状态回调函数"""
def status_callback():
file_count = self.count_wav_files()
print(f"\r{test_name} - 当前生成文件数: {file_count}", end="", flush=True)
return status_callback
def test_original_recorder(self, duration=10):
"""测试原版录音器"""
print("\n=== 测试原版录音器 ===")
self.clear_cache()
recorder = RecorderSync()
print("请准备说话,3秒后开始录音...")
time.sleep(3)
print("开始录音(原版)...")
recorder.start_recording()
# 状态监控
start_time = time.time()
status_callback = self.create_status_callback("原版录音器")
while time.time() - start_time < duration:
status_callback()
time.sleep(0.1)
print("\n停止录音...")
recorder.stop_recording()
file_count = self.count_wav_files()
backup_path = self.backup_files("original")
result = {
'type': '原版录音器',
'file_count': file_count,
'backup_path': backup_path,
'timestamp': datetime.now().isoformat()
}
self.test_results.append(result)
print(f"原版录音器测试完成 - 生成文件数: {file_count}")
return result
def test_improved_recorder(self, duration=10):
"""测试改进版录音器"""
print("\n=== 测试改进版录音器 ===")
self.clear_cache()
recorder = RecorderSyncImproved(
volume_threshold=0.03,
silence_duration=1.5,
min_speech_duration=0.5
)
print("请准备说话,3秒后开始录音...")
time.sleep(3)
print("开始录音(改进版)...")
recorder.start_recording()
# 状态监控
start_time = time.time()
status_callback = self.create_status_callback("改进版录音器")
while time.time() - start_time < duration:
status_callback()
status = recorder.get_status()
if status['is_speaking']:
print(f"\r改进版录音器 - 检测到语音中... 文件数: {self.count_wav_files()}", end="", flush=True)
time.sleep(0.1)
print("\n停止录音...")
recorder.stop_recording()
file_count = self.count_wav_files()
backup_path = self.backup_files("improved")
result = {
'type': '改进版录音器',
'file_count': file_count,
'backup_path': backup_path,
'timestamp': datetime.now().isoformat()
}
self.test_results.append(result)
print(f"改进版录音器测试完成 - 生成文件数: {file_count}")
return result
def compare_results(self):
"""对比测试结果"""
if len(self.test_results) < 2:
print("需要至少两个测试结果才能进行对比")
return
print("\n=== 测试结果对比 ===")
for result in self.test_results:
print(f"{result['type']}: {result['file_count']} 个文件")
original = next((r for r in self.test_results if '原版' in r['type']), None)
improved = next((r for r in self.test_results if '改进版' in r['type']), None)
if original and improved:
reduction = original['file_count'] - improved['file_count']
if reduction > 0:
print(f"\n✅ 改进效果: 减少了 {reduction} 个文件分割")
print(f" 分割减少率: {reduction/original['file_count']*100:.1f}%")
elif reduction < 0:
print(f"\n❌ 意外结果: 增加了 {abs(reduction)} 个文件分割")
else:
print(f"\n➖ 文件数量相同,需要进一步分析")
def save_test_report(self):
"""保存测试报告"""
report_file = f"voice_segmentation_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write("语音分割修复验证测试报告\n")
f.write("=" * 40 + "\n")
f.write(f"测试时间: {datetime.now().isoformat()}\n\n")
for result in self.test_results:
f.write(f"测试类型: {result['type']}\n")
f.write(f"生成文件数: {result['file_count']}\n")
f.write(f"备份路径: {result['backup_path']}\n")
f.write(f"测试时间: {result['timestamp']}\n")
f.write("-" * 30 + "\n")
if len(self.test_results) >= 2:
original = next((r for r in self.test_results if '原版' in r['type']), None)
improved = next((r for r in self.test_results if '改进版' in r['type']), None)
if original and improved:
reduction = original['file_count'] - improved['file_count']
f.write(f"\n对比结果:\n")
f.write(f"文件分割减少数量: {reduction}\n")
if original['file_count'] > 0:
f.write(f"分割减少率: {reduction/original['file_count']*100:.1f}%\n")
print(f"\n测试报告已保存: {report_file}")
return report_file
def interactive_menu(self):
"""交互式菜单"""
while True:
print("\n=== 语音分割修复验证工具 ===")
print("1. 测试原版录音器")
print("2. 测试改进版录音器")
print("3. 对比测试结果")
print("4. 保存测试报告")
print("5. 清空缓存")
print("6. 查看当前文件数")
print("0. 退出")
choice = input("\n请选择操作 (0-6): ").strip()
if choice == '1':
duration = input("请输入录音时长(秒,默认10): ").strip()
duration = int(duration) if duration.isdigit() else 10
self.test_original_recorder(duration)
elif choice == '2':
duration = input("请输入录音时长(秒,默认10): ").strip()
duration = int(duration) if duration.isdigit() else 10
self.test_improved_recorder(duration)
elif choice == '3':
self.compare_results()
elif choice == '4':
self.save_test_report()
elif choice == '5':
self.clear_cache()
print("缓存已清空")
elif choice == '6':
count = self.count_wav_files()
print(f"当前缓存目录中有 {count} 个WAV文件")
elif choice == '0':
print("退出测试工具")
break
else:
print("无效选择,请重试")
def main():
"""主函数"""
print("语音分割修复验证测试工具")
print("用于验证改进版录音器是否解决了语音过度分割问题")
tester = VoiceSegmentationTester()
# 检查必要的目录
if not os.path.exists(tester.cache_dir):
os.makedirs(tester.cache_dir)
print(f"创建缓存目录: {tester.cache_dir}")
# 启动交互式菜单
tester.interactive_menu()
if __name__ == "__main__":
main()