test_optimization_integration.py 11.4 KB
# AIfeng/2024-12-19 16:30:00
"""
优化模块与流式识别系统集成测试

测试优化模块是否正确集成到流式识别系统中,包括:
1. OptimizationManager 初始化
2. 音频数据流处理
3. 优化模式切换
4. 性能指标收集
5. 错误处理和回退机制
"""

import pytest
import asyncio
import numpy as np
import sys
import os
from unittest.mock import Mock, patch, AsyncMock
from typing import Dict, Any

# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from streaming.streaming_recorder import StreamingRecorder
from streaming.optimization.optimization_manager import OptimizationManager, OptimizationMode
from streaming.streaming_recognition_manager import StreamingRecognitionManager
from streaming.streaming_vad import StreamingVAD


class TestOptimizationIntegration:
    """优化模块集成测试类"""
    
    @pytest.fixture
    def mock_asr_client(self):
        """模拟ASR客户端"""
        client = Mock()
        client.recognize_async = AsyncMock(return_value={
            'text': '测试识别结果',
            'confidence': 0.95,
            'session_id': 'test_session_001'
        })
        return client
    
    @pytest.fixture
    def mock_config(self):
        """模拟配置"""
        return {
            'audio': {
                'sample_rate': 16000,
                'chunk_size': 1024,
                'channels': 1,
                'format': 'int16'
            },
            'vad': {
                'threshold': 0.5,
                'min_speech_duration': 0.3,
                'max_silence_duration': 0.8
            },
            'recognition': {
                'language': 'zh',
                'model': 'paraformer-zh'
            }
        }
    
    @pytest.fixture
    def streaming_recorder(self, mock_asr_client, mock_config):
        """创建流式录音器实例"""
        audio_config = mock_config['audio']
        vad_config = mock_config['vad']
        
        recorder = StreamingRecorder(
            chunk=audio_config['chunk_size'],
            rate=audio_config['sample_rate'],
            channels=audio_config['channels'],
            volume_threshold=vad_config['threshold'],
            min_speech_duration=vad_config['min_speech_duration'],
            silence_duration=vad_config['max_silence_duration'],
            username="test_user"
        )
        
        # 替换ASR客户端为模拟对象
        recorder.asr_client = mock_asr_client
        
        return recorder
    
    def test_optimization_manager_initialization(self, streaming_recorder):
        """测试优化管理器初始化"""
        # 验证优化管理器存在
        assert streaming_recorder.optimization_manager is not None
        assert isinstance(streaming_recorder.optimization_manager, OptimizationManager)
        
        # 验证初始状态
        assert streaming_recorder.optimization_manager.current_mode == OptimizationMode.BALANCED
        
        # 验证回调函数设置
        assert streaming_recorder.optimization_manager.result_callbacks is not None
        assert streaming_recorder.optimization_manager.error_callbacks is not None
        assert streaming_recorder.optimization_manager.metrics_callbacks is not None
    
    def test_optimization_mode_switching(self, streaming_recorder):
        """测试优化模式切换"""
        # 测试切换到速度优先模式
        streaming_recorder.set_optimization_mode(OptimizationMode.SPEED_FIRST)
        assert streaming_recorder.optimization_manager.current_mode == OptimizationMode.SPEED_FIRST
        
        # 测试切换到精度优先模式
        streaming_recorder.set_optimization_mode(OptimizationMode.ACCURACY_FIRST)
        assert streaming_recorder.optimization_manager.current_mode == OptimizationMode.ACCURACY_FIRST
        
        # 测试切换到自适应模式
        streaming_recorder.set_optimization_mode(OptimizationMode.ADAPTIVE)
        assert streaming_recorder.optimization_manager.current_mode == OptimizationMode.ADAPTIVE
    
    def test_audio_processing_with_optimization(self, streaming_recorder):
        """测试带优化的音频处理"""
        # 创建测试音频数据
        audio_data = np.random.randint(-32768, 32767, 1024, dtype=np.int16)
        # 修复 BufferError: memoryview has 1 exported buffer
        audio_bytes = bytes(audio_data.tobytes())
        
        # 创建会话
        session_id = 'test_session_001'
        streaming_recorder.optimization_manager.create_session(session_id)
        
        # 模拟音频处理
        result = streaming_recorder.optimization_manager.process_audio(
            session_id=session_id,
            audio_data=audio_bytes,
            sample_rate=16000,
            timestamp=1234567890.0
        )
        
        # 验证处理结果
        assert result is True  # process_audio返回布尔值表示是否成功提交处理
    
    def test_optimization_result_callback(self, streaming_recorder):
        """测试优化结果回调"""
        # 模拟优化结果
        result = {
            'session_id': 'test_session_001',
            'optimized_audio': b'\x00' * 1024,
            'metadata': {
                'processing_time_ms': 45,
                'optimization_applied': True,
                'confidence_boost': 0.03
            }
        }
        
        # 调用结果回调
        streaming_recorder._on_optimization_result(result)
        
        # 验证结果被正确处理(这里主要验证不抛出异常)
        assert True  # 如果没有异常,测试通过
    
    def test_optimization_error_callback(self, streaming_recorder):
        """测试优化错误回调"""
        # 模拟优化错误
        error_info = {
            'session_id': 'test_session_001',
            'error_type': 'processing_timeout',
            'message': '音频处理超时',
            'timestamp': 1234567890.0
        }
        
        # 调用错误回调
        streaming_recorder._on_optimization_error(error_info)
        
        # 验证错误被正确处理(这里主要验证不抛出异常)
        assert True  # 如果没有异常,测试通过
    
    def test_optimization_metrics_callback(self, streaming_recorder):
        """测试优化指标回调"""
        # 模拟性能指标
        metrics = {
            'session_id': 'test_session_001',
            'metrics': {
                'total_latency_ms': 120,
                'processing_latency_ms': 45,
                'accuracy_score': 0.92,
                'optimization_ratio': 0.15,
                'memory_usage_mb': 25.6
            }
        }
        
        # 调用指标回调
        streaming_recorder._on_optimization_metrics(metrics)
        
        # 验证指标被正确处理(这里主要验证不抛出异常)
        assert True  # 如果没有异常,测试通过
    
    def test_get_optimization_metrics(self, streaming_recorder):
        """测试获取优化指标"""
        # 获取性能统计
        stats = streaming_recorder.optimization_manager.get_performance_stats()
        
        # 验证统计数据结构
        assert isinstance(stats, dict)
        assert 'total_sessions' in stats
        assert 'active_sessions' in stats
        assert 'average_latency_ms' in stats
        
        # 获取优化指标
        metrics = streaming_recorder.optimization_manager.get_optimization_metrics()
        assert isinstance(metrics, dict)
    
    def test_fallback_mechanism(self, streaming_recorder):
        """测试回退机制"""
        # 创建测试音频数据
        audio_data = np.random.randint(-32768, 32767, 1024, dtype=np.int16)
        # 修复 BufferError: memoryview has 1 exported buffer
        audio_bytes = bytes(audio_data.tobytes())
        
        # 模拟优化失败的情况
        with patch.object(streaming_recorder.optimization_manager, 'process_audio', side_effect=Exception("Optimization failed")):
            # 模拟回退处理
            with patch.object(streaming_recorder, '_fallback_audio_processing') as mock_fallback:
                try:
                    streaming_recorder.optimization_manager.process_audio(
                        session_id='test_session_002',
                        audio_data=audio_bytes,
                        sample_rate=16000,
                        timestamp=1234567890.0
                    )
                except Exception:
                    # 触发回退机制
                    context = {
                        'audio_data': audio_bytes,
                        'session_id': 'test_session_002',
                        'result_type': 'final'
                    }
                    streaming_recorder._fallback_audio_processing(context)
                    
                    # 验证回退机制被触发
                    mock_fallback.assert_called_once()
    
    def test_integration_with_vad_and_recognition_manager(self, streaming_recorder):
        """测试与VAD和识别管理器的集成"""
        # 验证VAD集成
        assert hasattr(streaming_recorder, 'vad')
        assert isinstance(streaming_recorder.vad, StreamingVAD)
        
        # 验证识别管理器集成
        assert hasattr(streaming_recorder, 'recognition_manager')
        assert isinstance(streaming_recorder.recognition_manager, StreamingRecognitionManager)
        
        # 验证优化管理器与其他组件的协调
        assert streaming_recorder.optimization_manager is not None
        
        # 测试组件间的回调链
        assert streaming_recorder.vad.on_speech_start is not None
        assert streaming_recorder.vad.on_speech_end is not None
    
    def test_end_to_end_optimization_flow(self, streaming_recorder, mock_asr_client):
        """测试端到端优化流程"""
        # 创建测试音频数据
        audio_data = np.random.randint(-32768, 32767, 1024, dtype=np.int16)
        # 修复 BufferError: memoryview has 1 exported buffer
        audio_bytes = bytes(audio_data.tobytes())
        
        # 创建会话
        session_id = 'test_session_003'
        streaming_recorder.optimization_manager.create_session(session_id)
        
        # 模拟音频处理
        result = streaming_recorder.optimization_manager.process_audio(
            session_id=session_id,
            audio_data=audio_bytes,
            sample_rate=16000,
            timestamp=1234567890.0
        )
        
        # 验证处理成功
        assert result is True
        
        # 验证会话存在
        stats = streaming_recorder.optimization_manager.get_performance_stats()
        assert stats['total_sessions'] >= 1
    
    def test_optimization_configuration_validation(self, streaming_recorder):
        """测试优化配置验证"""
        # 获取优化管理器配置
        config = streaming_recorder.optimization_manager.config
        
        # 验证配置结构
        assert isinstance(config, dict)
        assert 'adaptive_vad_chunking' in config
        assert 'intelligent_segmentation' in config
        
        # 验证配置项结构(不检查enabled字段,因为可能不存在)
        assert isinstance(config['adaptive_vad_chunking'], dict)
        assert isinstance(config['intelligent_segmentation'], dict)
        
        # 验证优化管理器基本功能
        assert hasattr(streaming_recorder.optimization_manager, 'current_mode')
        assert hasattr(streaming_recorder.optimization_manager, 'set_optimization_mode')
        assert hasattr(streaming_recorder.optimization_manager, 'process_audio')