test_streaming_recognition.py 15.6 KB
# AIfeng/2025-07-07 09:34:55
# 流式语音识别系统测试
# 测试流式VAD、识别结果管理和完整的流式录音功能

import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import time
import threading
from streaming.streaming_recorder import StreamingRecorder
from streaming.streaming_vad import StreamingVAD
from streaming.streaming_recognition_manager import StreamingRecognitionManager
from logger import get_logger

logger = get_logger("TestStreamingRecognition")

class StreamingRecognitionTester:
    """流式语音识别测试器"""
    
    def __init__(self):
        self.test_results = []
        self.current_test = None
        
    def log_result(self, test_name: str, status: str, details: str = ""):
        """记录测试结果"""
        result = {
            'test_name': test_name,
            'status': status,
            'details': details,
            'timestamp': time.time()
        }
        self.test_results.append(result)
        
        status_symbol = "✓" if status == "PASS" else "✗" if status == "FAIL" else "⚠"
        print(f"{status_symbol} {test_name}: {status} {details}")
    
    def test_streaming_vad(self):
        """测试流式VAD功能"""
        print("\n=== 测试流式VAD功能 ===")
        
        try:
            # 创建VAD实例
            vad = StreamingVAD(
                sample_rate=16000,
                chunk_size=1024,
                volume_threshold=0.03,
                silence_duration=1.0,
                min_speech_duration=0.3,
                max_speech_duration=10.0,
                partial_result_interval=2.0
            )
            
            self.log_result("VAD初始化", "PASS", "成功创建StreamingVAD实例")
            
            # 测试状态获取
            status = vad.get_status()
            expected_keys = ['is_speaking', 'dynamic_threshold', 'volume_threshold']
            if all(key in status for key in expected_keys):
                self.log_result("VAD状态获取", "PASS", f"状态包含所有必要字段")
            else:
                self.log_result("VAD状态获取", "FAIL", f"状态缺少必要字段")
            
            # 测试重置功能
            vad.reset()
            status_after_reset = vad.get_status()
            if not status_after_reset['is_speaking']:
                self.log_result("VAD重置功能", "PASS", "重置后状态正确")
            else:
                self.log_result("VAD重置功能", "FAIL", "重置后状态异常")
            
            # 模拟音频数据处理
            import numpy as np
            
            # 生成静音数据
            silent_data = np.zeros(1024, dtype=np.int16).tobytes()
            result = vad.process_audio_frame(silent_data)
            
            if result['action'] == 'silence' and not result['is_speaking']:
                self.log_result("VAD静音检测", "PASS", "正确检测静音")
            else:
                self.log_result("VAD静音检测", "FAIL", f"静音检测异常: {result['action']}")
            
            # 生成语音数据(模拟)
            speech_data = np.random.randint(-5000, 5000, 1024, dtype=np.int16).tobytes()
            result = vad.process_audio_frame(speech_data)
            
            if result['volume'] > 0:
                self.log_result("VAD音量计算", "PASS", f"音量计算正常: {result['volume']:.4f}")
            else:
                self.log_result("VAD音量计算", "FAIL", "音量计算异常")
                
        except Exception as e:
            self.log_result("VAD测试异常", "FAIL", str(e))
    
    def test_recognition_manager(self):
        """测试识别结果管理器"""
        print("\n=== 测试识别结果管理器 ===")
        
        try:
            # 创建识别管理器实例
            manager = StreamingRecognitionManager(
                confidence_threshold=0.6,
                max_session_duration=30.0,
                result_merge_window=1.0
            )
            
            self.log_result("识别管理器初始化", "PASS", "成功创建StreamingRecognitionManager实例")
            
            # 测试会话创建
            session_id = "test_session_001"
            if manager.create_session(session_id, {'test': True}):
                self.log_result("会话创建", "PASS", f"成功创建会话: {session_id}")
            else:
                self.log_result("会话创建", "FAIL", "会话创建失败")
                return
            
            # 测试部分结果添加
            if manager.add_partial_result(session_id, "你好", confidence=0.8):
                self.log_result("部分结果添加", "PASS", "成功添加部分结果")
            else:
                self.log_result("部分结果添加", "FAIL", "部分结果添加失败")
            
            # 测试重复结果检测
            if not manager.add_partial_result(session_id, "你好", confidence=0.8):
                self.log_result("重复结果检测", "PASS", "正确检测并跳过重复结果")
            else:
                self.log_result("重复结果检测", "WARN", "重复结果检测可能有问题")
            
            # 测试最终结果添加
            if manager.add_final_result(session_id, "你好,世界", confidence=0.9):
                self.log_result("最终结果添加", "PASS", "成功添加最终结果")
            else:
                self.log_result("最终结果添加", "FAIL", "最终结果添加失败")
            
            # 测试合并结果获取
            merged_result = manager.get_merged_result(session_id)
            if merged_result:
                self.log_result("合并结果获取", "PASS", f"合并结果: {merged_result}")
            else:
                self.log_result("合并结果获取", "FAIL", "合并结果为空")
            
            # 测试会话完成
            if manager.complete_session(session_id):
                self.log_result("会话完成", "PASS", "成功完成会话")
            else:
                self.log_result("会话完成", "FAIL", "会话完成失败")
            
            # 测试状态获取
            status = manager.get_status()
            if 'active_sessions_count' in status:
                self.log_result("管理器状态获取", "PASS", f"活跃会话数: {status['active_sessions_count']}")
            else:
                self.log_result("管理器状态获取", "FAIL", "状态获取异常")
                
        except Exception as e:
            self.log_result("识别管理器测试异常", "FAIL", str(e))
    
    def test_streaming_recorder_basic(self):
        """测试流式录音器基本功能"""
        print("\n=== 测试流式录音器基本功能 ===")
        
        try:
            # 创建流式录音器实例
            recorder = StreamingRecorder(
                chunk=1024,
                rate=16000,
                volume_threshold=0.03,
                silence_duration=1.0,
                min_speech_duration=0.3,
                username="test_user"
            )
            
            self.log_result("录音器初始化", "PASS", "成功创建StreamingRecorder实例")
            
            # 测试设备列表
            devices = recorder.list_audio_devices()
            if devices:
                self.log_result("音频设备列表", "PASS", f"找到{len(devices)}个音频设备")
                for i, device in enumerate(devices[:3]):  # 只显示前3个
                    print(f"  设备{device['index']}: {device['name']}")
            else:
                self.log_result("音频设备列表", "WARN", "未找到音频设备")
            
            # 测试状态获取
            status = recorder.get_status()
            expected_keys = ['is_recording', 'vad_status', 'recognition_status']
            if all(key in status for key in expected_keys):
                self.log_result("录音器状态获取", "PASS", "状态包含所有必要字段")
            else:
                self.log_result("录音器状态获取", "FAIL", "状态缺少必要字段")
            
            # 测试录音状态
            if not recorder.is_recording():
                self.log_result("录音状态检查", "PASS", "初始状态为未录音")
            else:
                self.log_result("录音状态检查", "FAIL", "初始状态异常")
                
        except Exception as e:
            self.log_result("录音器基本测试异常", "FAIL", str(e))
    
    def test_streaming_recorder_callbacks(self):
        """测试流式录音器回调功能"""
        print("\n=== 测试流式录音器回调功能 ===")
        
        try:
            callback_results = {
                'partial_result_called': False,
                'final_result_called': False,
                'session_complete_called': False,
                'status_update_called': False
            }
            
            def on_partial_result(session_id, text, confidence):
                callback_results['partial_result_called'] = True
                print(f"  [回调] 部分结果: {text}")
            
            def on_final_result(session_id, text, confidence):
                callback_results['final_result_called'] = True
                print(f"  [回调] 最终结果: {text}")
            
            def on_session_complete(session_id, final_text):
                callback_results['session_complete_called'] = True
                print(f"  [回调] 会话完成: {final_text}")
            
            def on_status_update(status):
                callback_results['status_update_called'] = True
                if status['type'] == 'vad_status':
                    print(f"  [回调] VAD状态: 语音={status['is_speaking']}, 音量={status['volume']:.3f}")
            
            recorder = StreamingRecorder(
                chunk=1024,
                rate=16000,
                username="test_user"
            )
            
            # 设置回调
            recorder.on_partial_result = on_partial_result
            recorder.on_final_result = on_final_result
            recorder.on_session_complete = on_session_complete
            recorder.on_status_update = on_status_update
            
            self.log_result("回调函数设置", "PASS", "成功设置所有回调函数")
            
            # 模拟触发回调(通过直接调用内部方法)
            # 注意:这是测试代码,实际使用中不应直接调用内部方法
            
            # 模拟VAD结果
            mock_vad_result = {
                'action': 'speech_start',
                'audio_buffer': [b'\x00' * 1024],
                'is_speaking': True,
                'volume': 0.05,
                'threshold': 0.03,
                'speech_duration': 0.5,
                'silence_duration': 0.0
            }
            
            # 测试回调是否正确设置
            if callable(recorder.on_partial_result):
                self.log_result("回调函数验证", "PASS", "回调函数设置正确")
            else:
                self.log_result("回调函数验证", "FAIL", "回调函数设置异常")
                
        except Exception as e:
            self.log_result("回调功能测试异常", "FAIL", str(e))
    
    def test_integration(self):
        """集成测试"""
        print("\n=== 集成测试 ===")
        
        try:
            # 测试组件间的协作
            vad = StreamingVAD()
            manager = StreamingRecognitionManager()
            
            # 创建会话
            session_id = "integration_test_001"
            manager.create_session(session_id)
            
            # 模拟完整的语音识别流程
            import numpy as np
            
            # 1. 静音阶段
            silent_data = np.zeros(1024, dtype=np.int16).tobytes()
            vad_result = vad.process_audio_frame(silent_data)
            
            if vad_result['action'] == 'silence':
                self.log_result("集成测试-静音阶段", "PASS", "VAD正确检测静音")
            
            # 2. 语音开始阶段
            speech_data = np.random.randint(-8000, 8000, 1024, dtype=np.int16).tobytes()
            vad_result = vad.process_audio_frame(speech_data)
            
            if vad_result['action'] in ['speech_start', 'speech_continue']:
                self.log_result("集成测试-语音检测", "PASS", f"VAD检测到语音: {vad_result['action']}")
                
                # 添加识别结果
                manager.add_partial_result(session_id, "测试语音", confidence=0.8)
                merged_result = manager.get_merged_result(session_id)
                
                if "测试语音" in merged_result:
                    self.log_result("集成测试-结果管理", "PASS", "识别结果正确管理")
                else:
                    self.log_result("集成测试-结果管理", "FAIL", "识别结果管理异常")
            
            # 3. 完成会话
            manager.complete_session(session_id)
            final_result = manager.get_merged_result(session_id)
            
            if final_result:
                self.log_result("集成测试-会话完成", "PASS", f"最终结果: {final_result}")
            else:
                self.log_result("集成测试-会话完成", "WARN", "最终结果为空")
                
        except Exception as e:
            self.log_result("集成测试异常", "FAIL", str(e))
    
    def run_all_tests(self):
        """运行所有测试"""
        print("开始流式语音识别系统测试...")
        print("=" * 50)
        
        # 运行各项测试
        self.test_streaming_vad()
        self.test_recognition_manager()
        self.test_streaming_recorder_basic()
        self.test_streaming_recorder_callbacks()
        self.test_integration()
        
        # 统计测试结果
        print("\n" + "=" * 50)
        print("测试结果统计:")
        
        pass_count = sum(1 for r in self.test_results if r['status'] == 'PASS')
        fail_count = sum(1 for r in self.test_results if r['status'] == 'FAIL')
        warn_count = sum(1 for r in self.test_results if r['status'] == 'WARN')
        total_count = len(self.test_results)
        
        print(f"总测试数: {total_count}")
        print(f"通过: {pass_count} ✓")
        print(f"失败: {fail_count} ✗")
        print(f"警告: {warn_count} ⚠")
        
        success_rate = (pass_count / total_count * 100) if total_count > 0 else 0
        print(f"成功率: {success_rate:.1f}%")
        
        if fail_count == 0:
            print("\n🎉 所有核心测试通过!流式语音识别系统基本功能正常。")
        else:
            print(f"\n⚠️  有{fail_count}个测试失败,请检查相关功能。")
        
        return fail_count == 0

def main():
    """主函数"""
    print("流式语音识别系统测试工具")
    print("作者: AIfeng")
    print("时间: 2025-07-07 09:34:55")
    print()
    
    tester = StreamingRecognitionTester()
    
    try:
        success = tester.run_all_tests()
        
        if success:
            print("\n✅ 测试完成,系统准备就绪!")
            print("\n下一步可以:")
            print("1. 运行 streaming_recorder.py 进行实际录音测试")
            print("2. 集成到现有的应用中")
            print("3. 根据需要调整参数配置")
        else:
            print("\n❌ 测试发现问题,请修复后重新测试")
            
    except KeyboardInterrupt:
        print("\n测试被用户中断")
    except Exception as e:
        print(f"\n测试过程中发生异常: {e}")
        logger.error(f"测试异常: {e}", exc_info=True)

if __name__ == "__main__":
    main()