test_voice_segmentation_fix.py 9.46 KB
# AIfeng/2025-01-02 16:03:47
# 语音分割修复验证测试工具
# 用于对比原版和改进版录音器的语音分割效果

import os
import time
import shutil
from datetime import datetime
from recorder_sync import RecorderSync
from recorder_sync_improved import RecorderSyncImproved
from logger import get_logger

logger = get_logger("VoiceSegmentationTest")

class VoiceSegmentationTester:
    def __init__(self):
        self.cache_dir = "cache_data"
        self.backup_dir = "cache_data_backup"
        self.test_results = []
        
    def clear_cache(self):
        """清空缓存目录"""
        if os.path.exists(self.cache_dir):
            for file in os.listdir(self.cache_dir):
                if file.endswith('.wav'):
                    os.remove(os.path.join(self.cache_dir, file))
        logger.info("缓存目录已清空")
    
    def count_wav_files(self):
        """统计WAV文件数量"""
        if not os.path.exists(self.cache_dir):
            return 0
        return len([f for f in os.listdir(self.cache_dir) if f.endswith('.wav')])
    
    def backup_files(self, test_name):
        """备份测试文件"""
        if not os.path.exists(self.cache_dir):
            return
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = os.path.join(self.backup_dir, f"{test_name}_{timestamp}")
        
        if not os.path.exists(self.backup_dir):
            os.makedirs(self.backup_dir)
        
        if not os.path.exists(backup_path):
            os.makedirs(backup_path)
        
        for file in os.listdir(self.cache_dir):
            if file.endswith('.wav'):
                shutil.copy2(
                    os.path.join(self.cache_dir, file),
                    os.path.join(backup_path, file)
                )
        
        logger.info(f"文件已备份到: {backup_path}")
        return backup_path
    
    def create_status_callback(self, test_name):
        """创建状态回调函数"""
        def status_callback():
            file_count = self.count_wav_files()
            print(f"\r{test_name} - 当前生成文件数: {file_count}", end="", flush=True)
        return status_callback
    
    def test_original_recorder(self, duration=10):
        """测试原版录音器"""
        print("\n=== 测试原版录音器 ===")
        self.clear_cache()
        
        recorder = RecorderSync()
        
        print("请准备说话,3秒后开始录音...")
        time.sleep(3)
        
        print("开始录音(原版)...")
        recorder.start_recording()
        
        # 状态监控
        start_time = time.time()
        status_callback = self.create_status_callback("原版录音器")
        
        while time.time() - start_time < duration:
            status_callback()
            time.sleep(0.1)
        
        print("\n停止录音...")
        recorder.stop_recording()
        
        file_count = self.count_wav_files()
        backup_path = self.backup_files("original")
        
        result = {
            'type': '原版录音器',
            'file_count': file_count,
            'backup_path': backup_path,
            'timestamp': datetime.now().isoformat()
        }
        
        self.test_results.append(result)
        print(f"原版录音器测试完成 - 生成文件数: {file_count}")
        return result
    
    def test_improved_recorder(self, duration=10):
        """测试改进版录音器"""
        print("\n=== 测试改进版录音器 ===")
        self.clear_cache()
        
        recorder = RecorderSyncImproved(
            volume_threshold=0.03,
            silence_duration=1.5,
            min_speech_duration=0.5
        )
        
        print("请准备说话,3秒后开始录音...")
        time.sleep(3)
        
        print("开始录音(改进版)...")
        recorder.start_recording()
        
        # 状态监控
        start_time = time.time()
        status_callback = self.create_status_callback("改进版录音器")
        
        while time.time() - start_time < duration:
            status_callback()
            status = recorder.get_status()
            if status['is_speaking']:
                print(f"\r改进版录音器 - 检测到语音中... 文件数: {self.count_wav_files()}", end="", flush=True)
            time.sleep(0.1)
        
        print("\n停止录音...")
        recorder.stop_recording()
        
        file_count = self.count_wav_files()
        backup_path = self.backup_files("improved")
        
        result = {
            'type': '改进版录音器',
            'file_count': file_count,
            'backup_path': backup_path,
            'timestamp': datetime.now().isoformat()
        }
        
        self.test_results.append(result)
        print(f"改进版录音器测试完成 - 生成文件数: {file_count}")
        return result
    
    def compare_results(self):
        """对比测试结果"""
        if len(self.test_results) < 2:
            print("需要至少两个测试结果才能进行对比")
            return
        
        print("\n=== 测试结果对比 ===")
        for result in self.test_results:
            print(f"{result['type']}: {result['file_count']} 个文件")
        
        original = next((r for r in self.test_results if '原版' in r['type']), None)
        improved = next((r for r in self.test_results if '改进版' in r['type']), None)
        
        if original and improved:
            reduction = original['file_count'] - improved['file_count']
            if reduction > 0:
                print(f"\n✅ 改进效果: 减少了 {reduction} 个文件分割")
                print(f"   分割减少率: {reduction/original['file_count']*100:.1f}%")
            elif reduction < 0:
                print(f"\n❌ 意外结果: 增加了 {abs(reduction)} 个文件分割")
            else:
                print(f"\n➖ 文件数量相同,需要进一步分析")
    
    def save_test_report(self):
        """保存测试报告"""
        report_file = f"voice_segmentation_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write("语音分割修复验证测试报告\n")
            f.write("=" * 40 + "\n")
            f.write(f"测试时间: {datetime.now().isoformat()}\n\n")
            
            for result in self.test_results:
                f.write(f"测试类型: {result['type']}\n")
                f.write(f"生成文件数: {result['file_count']}\n")
                f.write(f"备份路径: {result['backup_path']}\n")
                f.write(f"测试时间: {result['timestamp']}\n")
                f.write("-" * 30 + "\n")
            
            if len(self.test_results) >= 2:
                original = next((r for r in self.test_results if '原版' in r['type']), None)
                improved = next((r for r in self.test_results if '改进版' in r['type']), None)
                
                if original and improved:
                    reduction = original['file_count'] - improved['file_count']
                    f.write(f"\n对比结果:\n")
                    f.write(f"文件分割减少数量: {reduction}\n")
                    if original['file_count'] > 0:
                        f.write(f"分割减少率: {reduction/original['file_count']*100:.1f}%\n")
        
        print(f"\n测试报告已保存: {report_file}")
        return report_file
    
    def interactive_menu(self):
        """交互式菜单"""
        while True:
            print("\n=== 语音分割修复验证工具 ===")
            print("1. 测试原版录音器")
            print("2. 测试改进版录音器")
            print("3. 对比测试结果")
            print("4. 保存测试报告")
            print("5. 清空缓存")
            print("6. 查看当前文件数")
            print("0. 退出")
            
            choice = input("\n请选择操作 (0-6): ").strip()
            
            if choice == '1':
                duration = input("请输入录音时长(秒,默认10): ").strip()
                duration = int(duration) if duration.isdigit() else 10
                self.test_original_recorder(duration)
            
            elif choice == '2':
                duration = input("请输入录音时长(秒,默认10): ").strip()
                duration = int(duration) if duration.isdigit() else 10
                self.test_improved_recorder(duration)
            
            elif choice == '3':
                self.compare_results()
            
            elif choice == '4':
                self.save_test_report()
            
            elif choice == '5':
                self.clear_cache()
                print("缓存已清空")
            
            elif choice == '6':
                count = self.count_wav_files()
                print(f"当前缓存目录中有 {count} 个WAV文件")
            
            elif choice == '0':
                print("退出测试工具")
                break
            
            else:
                print("无效选择,请重试")

def main():
    """主函数"""
    print("语音分割修复验证测试工具")
    print("用于验证改进版录音器是否解决了语音过度分割问题")
    
    tester = VoiceSegmentationTester()
    
    # 检查必要的目录
    if not os.path.exists(tester.cache_dir):
        os.makedirs(tester.cache_dir)
        print(f"创建缓存目录: {tester.cache_dir}")
    
    # 启动交互式菜单
    tester.interactive_menu()

if __name__ == "__main__":
    main()