usage_example.py 12.3 KB
# AIfeng/2025-07-07 15:25:48
# 流式语音识别优化模块使用示例

import time
import logging
import asyncio
from typing import List, Dict

from .optimization_manager import OptimizationManager, OptimizationMode
from .intelligent_segmentation import IntelligentSentenceSegmentation
from .adaptive_vad_chunking import AdaptiveVADChunking, ChunkStrategy
from .recognition_result_tracker import RecognitionResultTracker, ResultType
from .streaming_display_manager import StreamingDisplayManager, UpdateType, DisplayPriority

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class StreamingRecognitionDemo:
    """流式语音识别演示类"""
    
    def __init__(self):
        # 初始化优化管理器
        self.optimization_manager = OptimizationManager()
        
        # 注册回调函数
        self.optimization_manager.register_result_callback(self.on_recognition_result)
        self.optimization_manager.register_error_callback(self.on_error)
        self.optimization_manager.register_metrics_callback(self.on_metrics_update)
        
        # 存储结果
        self.session_results = {}
        
        logger.info("流式语音识别演示初始化完成")
    
    def on_recognition_result(self, session_id: str, text: str, confidence: float, is_final: bool):
        """识别结果回调"""
        result_type = "最终" if is_final else "部分"
        logger.info(f"[{session_id}] {result_type}识别结果: {text} (置信度: {confidence:.2f})")
        
        # 存储结果
        if session_id not in self.session_results:
            self.session_results[session_id] = []
        
        self.session_results[session_id].append({
            'text': text,
            'confidence': confidence,
            'is_final': is_final,
            'timestamp': time.time()
        })
    
    def on_error(self, session_id: str, error: Exception):
        """错误处理回调"""
        logger.error(f"[{session_id}] 处理错误: {error}")
    
    def on_metrics_update(self, session_id: str, metrics):
        """性能指标回调"""
        logger.info(f"[{session_id}] 性能指标 - 总延迟: {metrics.total_latency_ms:.1f}ms, "
                   f"精度: {metrics.accuracy_score:.2f}, 置信度: {metrics.confidence_score:.2f}")
    
    def demo_basic_usage(self):
        """基本使用演示"""
        logger.info("=== 基本使用演示 ===")
        
        session_id = "demo_session_1"
        
        # 创建会话
        success = self.optimization_manager.create_session(session_id)
        if not success:
            logger.error("创建会话失败")
            return
        
        # 模拟音频数据处理
        sample_rate = 16000
        audio_duration = 2.0  # 2秒音频
        audio_size = int(sample_rate * audio_duration * 2)  # 16位音频
        mock_audio_data = b'\x00' * audio_size  # 模拟音频数据
        
        # 处理音频
        for i in range(5):  # 模拟5个音频块
            logger.info(f"处理第 {i+1} 个音频块")
            self.optimization_manager.process_audio(
                session_id,
                mock_audio_data,
                sample_rate,
                time.time()
            )
            time.sleep(0.5)  # 模拟实时处理间隔
        
        # 等待处理完成
        time.sleep(2)
        
        # 获取结果
        results = self.optimization_manager.get_session_results(session_id)
        logger.info(f"会话结果数量: {len(results)}")
        
        # 完成会话
        self.optimization_manager.complete_session(session_id)
        
        logger.info("基本使用演示完成")
    
    def demo_optimization_modes(self):
        """优化模式演示"""
        logger.info("=== 优化模式演示 ===")
        
        modes = [
            OptimizationMode.SPEED_FIRST,
            OptimizationMode.ACCURACY_FIRST,
            OptimizationMode.BALANCED,
            OptimizationMode.ADAPTIVE
        ]
        
        for mode in modes:
            logger.info(f"\n--- 测试 {mode.value} 模式 ---")
            
            # 设置优化模式
            self.optimization_manager.set_optimization_mode(mode)
            
            session_id = f"demo_session_{mode.value}"
            
            # 创建会话并处理
            self.optimization_manager.create_session(session_id)
            
            # 模拟音频处理
            mock_audio_data = b'\x00' * 32000  # 1秒音频数据
            start_time = time.time()
            
            self.optimization_manager.process_audio(
                session_id,
                mock_audio_data,
                16000,
                start_time
            )
            
            # 等待处理
            time.sleep(1)
            
            # 获取性能统计
            stats = self.optimization_manager.get_performance_stats()
            logger.info(f"平均延迟: {stats.get('average_latency_ms', 0):.1f}ms")
            
            # 完成会话
            self.optimization_manager.complete_session(session_id)
        
        logger.info("优化模式演示完成")
    
    def demo_individual_modules(self):
        """单独模块演示"""
        logger.info("=== 单独模块演示 ===")
        
        # 1. 智能断句模块演示
        logger.info("\n--- 智能断句模块 ---")
        segmentation = IntelligentSentenceSegmentation()
        
        session_id = "seg_demo"
        segmentation.create_session(session_id)
        
        test_texts = [
            "你好,今天天气很好",
            "我想要预订一张明天的机票",
            "请问现在几点了?谢谢"
        ]
        
        for i, text in enumerate(test_texts):
            segments = segmentation.process_text(
                session_id, text, time.time() + i, 0.9
            )
            for segment in segments:
                logger.info(f"断句结果: {segment.text} (类型: {segment.segment_type.value})")
        
        segmentation.complete_session(session_id)
        
        # 2. 自适应VAD分片模块演示
        logger.info("\n--- 自适应VAD分片模块 ---")
        chunking = AdaptiveVADChunking()
        
        session_id = "chunk_demo"
        chunking.create_session(session_id)
        
        # 测试不同策略
        strategies = [ChunkStrategy.FAST_RESPONSE, ChunkStrategy.HIGH_ACCURACY, ChunkStrategy.BALANCED]
        
        for strategy in strategies:
            logger.info(f"测试策略: {strategy.value}")
            chunking.set_strategy(strategy)
            
            mock_audio = b'\x00' * 16000  # 1秒音频
            chunks = chunking.process_audio(session_id, mock_audio, 16000)
            
            logger.info(f"生成分片数量: {len(chunks)}")
            for chunk in chunks:
                logger.info(f"分片 {chunk.chunk_id}: 语音={chunk.is_speech}, "
                           f"大小={len(chunk.audio_data)}字节")
        
        chunking.complete_session(session_id)
        
        # 3. 识别结果追踪模块演示
        logger.info("\n--- 识别结果追踪模块 ---")
        tracker = RecognitionResultTracker()
        
        session_id = "track_demo"
        tracker.create_session(session_id)
        
        # 添加一系列识别结果
        result_texts = [
            ("你好", 0.8, False),
            ("你好,今天", 0.85, False),
            ("你好,今天天气很好", 0.9, True)
        ]
        
        result_ids = []
        for text, confidence, is_final in result_texts:
            result_id = tracker.add_recognition_result(
                session_id, text, confidence, is_final,
                time.time(), time.time() + 1
            )
            result_ids.append(result_id)
            logger.info(f"添加结果: {text} (ID: {result_id})")
        
        # 建立关联关系
        if len(result_ids) >= 2:
            tracker.add_result_relation(
                result_ids[0], result_ids[1], "refinement"
            )
            logger.info(f"建立关联: {result_ids[0]} -> {result_ids[1]}")
        
        # 获取结果链
        if result_ids:
            chain = tracker.get_result_chain(result_ids[0])
            logger.info(f"结果链长度: {len(chain)}")
        
        tracker.complete_session(session_id)
        
        # 4. 流式显示管理模块演示
        logger.info("\n--- 流式显示管理模块 ---")
        display = StreamingDisplayManager()
        
        # 注册显示回调
        def display_callback(session_id, segments):
            logger.info(f"显示更新 [{session_id}]: {len(segments)} 个片段")
            for segment in segments[-3:]:  # 只显示最后3个片段
                logger.info(f"  片段: {segment.content} (置信度: {segment.confidence:.2f})")
        
        display.register_display_callback(display_callback)
        
        session_id = "display_demo"
        
        # 模拟显示更新
        test_updates = [
            ("你好", UpdateType.APPEND, DisplayPriority.NORMAL),
            ("你好,今天", UpdateType.REPLACE_PARTIAL, DisplayPriority.NORMAL),
            ("你好,今天天气很好", UpdateType.REPLACE_FINAL, DisplayPriority.HIGH)
        ]
        
        for i, (text, update_type, priority) in enumerate(test_updates):
            display.update_display(
                session_id, f"segment_{i}", text,
                update_type, 0.8 + i * 0.05, 
                update_type == UpdateType.REPLACE_FINAL,
                priority
            )
            time.sleep(0.2)
        
        # 等待显示更新完成
        time.sleep(1)
        
        # 获取显示内容
        segments = display.get_session_display(session_id)
        logger.info(f"最终显示片段数量: {len(segments)}")
        
        display.shutdown()
        
        logger.info("单独模块演示完成")
    
    def demo_performance_monitoring(self):
        """性能监控演示"""
        logger.info("=== 性能监控演示 ===")
        
        session_id = "perf_demo"
        self.optimization_manager.create_session(session_id)
        
        # 处理多个音频块
        for i in range(10):
            mock_audio = b'\x00' * 16000  # 1秒音频
            self.optimization_manager.process_audio(
                session_id, mock_audio, 16000, time.time()
            )
            time.sleep(0.1)
        
        # 等待处理完成
        time.sleep(2)
        
        # 获取性能统计
        stats = self.optimization_manager.get_performance_stats()
        logger.info("=== 性能统计 ===")
        logger.info(f"总会话数: {stats.get('total_sessions', 0)}")
        logger.info(f"活跃会话数: {stats.get('active_sessions', 0)}")
        logger.info(f"平均延迟: {stats.get('average_latency_ms', 0):.1f}ms")
        
        # 获取优化指标
        metrics = self.optimization_manager.get_optimization_metrics(session_id)
        if metrics:
            logger.info("=== 优化指标 ===")
            logger.info(f"总延迟: {metrics.total_latency_ms:.1f}ms")
            logger.info(f"分片延迟: {metrics.chunking_latency_ms:.1f}ms")
            logger.info(f"断句延迟: {metrics.segmentation_latency_ms:.1f}ms")
            logger.info(f"追踪延迟: {metrics.tracking_latency_ms:.1f}ms")
            logger.info(f"显示延迟: {metrics.display_latency_ms:.1f}ms")
        
        self.optimization_manager.complete_session(session_id)
        
        logger.info("性能监控演示完成")
    
    def run_all_demos(self):
        """运行所有演示"""
        logger.info("开始运行流式语音识别优化模块演示")
        
        try:
            self.demo_basic_usage()
            time.sleep(1)
            
            self.demo_optimization_modes()
            time.sleep(1)
            
            self.demo_individual_modules()
            time.sleep(1)
            
            self.demo_performance_monitoring()
            
        except Exception as e:
            logger.error(f"演示过程中出错: {e}")
        
        finally:
            # 关闭优化管理器
            self.optimization_manager.shutdown()
            logger.info("演示完成,优化管理器已关闭")

def main():
    """主函数"""
    demo = StreamingRecognitionDemo()
    demo.run_all_demos()

if __name__ == "__main__":
    main()