streaming_demo.py 21.3 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
# AIfeng/2025-07-07 09:34:55
# 流式语音识别演示应用
# 展示完整的流式语音识别功能,包括实时VAD、累积识别和结果管理

import sys
import os
import json
import time
import threading
from typing import Optional, Dict, Any

# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

try:
    from streaming.streaming_recorder import StreamingRecorder
    from streaming.optimization.optimization_manager import OptimizationMode
    from logger import get_logger
    from funasr_asr_sync import FunASRSync
except ImportError as e:
    print(f"导入模块失败: {e}")
    print("请确保所有依赖模块都已正确安装")
    sys.exit(1)

logger = get_logger("StreamingDemo")

class StreamingRecognitionDemo:
    """流式语音识别演示应用"""
    
    def __init__(self, config_path: str = "streaming/streaming_config.json"):
        self.config = self._load_config(config_path)
        self.recorder: Optional[StreamingRecorder] = None
        self.asr_client: Optional[FunASRSync] = None
        self.is_running = False
        self.selected_device_index = None  # 选中的音频设备索引
        self.session_results = {}
        self.stats = {
            'sessions_created': 0,
            'partial_results': 0,
            'final_results': 0,
            'total_speech_duration': 0.0,
            'start_time': None,
            'optimization_metrics': {}
        }
        self.current_optimization_mode = OptimizationMode.BALANCED
        
    def _load_config(self, config_path: str) -> Dict[str, Any]:
        """加载配置文件"""
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                config = json.load(f)
            logger.info(f"配置加载成功: {config_path}")
            return config
        except FileNotFoundError:
            logger.warning(f"配置文件未找到: {config_path},使用默认配置")
            return self._get_default_config()
        except json.JSONDecodeError as e:
            logger.error(f"配置文件格式错误: {e}")
            return self._get_default_config()
    
    def _get_default_config(self) -> Dict[str, Any]:
        """获取默认配置"""
        return {
            "streaming_vad": {
                "sample_rate": 16000,
                "chunk_size": 1024,
                "volume_threshold": 0.03,
                "silence_duration": 1.0,
                "min_speech_duration": 0.3,
                "max_speech_duration": 15.0,
                "partial_result_interval": 2.0
            },
            "streaming_recognition": {
                "confidence_threshold": 0.6,
                "max_session_duration": 30.0,
                "result_merge_window": 1.0
            },
            "streaming_recorder": {
                "audio": {
                    "rate": 16000,
                    "chunk": 1024,
                    "channels": 1
                }
            }
        }
    
    def _setup_asr_client(self) -> bool:
        """ASR客户端由StreamingRecorder内部管理,此方法保持兼容性"""
        # StreamingRecorder内部已经创建和管理FunASRSync客户端
        # 这里只是为了保持接口兼容性
        logger.info("ASR客户端将由StreamingRecorder内部管理")
        return True
    
    def _setup_recorder(self) -> bool:
        """设置录音器(真实音频录制)"""
        try:
            vad_config = self.config.get("streaming_vad", {})
            recognition_config = self.config.get("streaming_recognition", {})
            audio_config = self.config.get("streaming_recorder", {}).get("audio", {})
            
            self.recorder = StreamingRecorder(
                chunk=audio_config.get("chunk", 1024),
                rate=audio_config.get("rate", 16000),
                channels=audio_config.get("channels", 1),
                volume_threshold=vad_config.get("volume_threshold", 0.03),
                silence_duration=vad_config.get("silence_duration", 1.0),
                min_speech_duration=vad_config.get("min_speech_duration", 0.5),
                max_speech_duration=vad_config.get("max_speech_duration", 15.0),
                partial_result_interval=vad_config.get("partial_result_interval", 2.0),
                confidence_threshold=recognition_config.get("confidence_threshold", 0.6),
                max_session_duration=recognition_config.get("max_session_duration", 30.0),
                result_merge_window=recognition_config.get("result_merge_window", 1.0),
                username="streaming_demo",
                config=self.config
            )
            
            # 设置回调函数
            self.recorder.on_partial_result = self._on_partial_result
            self.recorder.on_final_result = self._on_final_result
            self.recorder.on_session_complete = self._on_session_complete
            self.recorder.on_status_update = self._on_status_update
            
            logger.info("StreamingRecorder设置完成,已连接ASR服务")
            return True
            
        except Exception as e:
            logger.error(f"录音器设置失败: {e}")
            return False
    
    def _on_partial_result(self, session_id: str, text: str, confidence: float):
        """部分识别结果回调"""
        self.stats['partial_results'] += 1
        
        if session_id not in self.session_results:
            self.session_results[session_id] = {
                'partial_results': [],
                'final_result': None,
                'start_time': time.time()
            }
        
        self.session_results[session_id]['partial_results'].append({
            'text': text,
            'confidence': confidence,
            'timestamp': time.time()
        })
        
        print(f"\r[部分] {text} (置信度: {confidence:.2f})", end="", flush=True)
    
    def _on_final_result(self, session_id: str, text: str, confidence: float):
        """最终识别结果回调"""
        self.stats['final_results'] += 1
        
        if session_id in self.session_results:
            self.session_results[session_id]['final_result'] = {
                'text': text,
                'confidence': confidence,
                'timestamp': time.time()
            }
        
        print(f"\n[最终] {text} (置信度: {confidence:.2f})")
    
    def _on_session_complete(self, session_id: str, final_text: str):
        """会话完成回调"""
        if session_id in self.session_results:
            session_data = self.session_results[session_id]
            duration = time.time() - session_data['start_time']
            self.stats['total_speech_duration'] += duration
            
            print(f"\n[会话完成] {final_text}")
            print(f"[会话时长] {duration:.1f}秒")
            print("-" * 50)
    
    def _on_status_update(self, status: Dict[str, Any]):
        """状态更新回调"""
        status_type = status.get('type')
        
        if status_type == 'vad_status':
            # 实时显示VAD状态(可选,避免输出过多)
            if status.get('is_speaking') and hasattr(self, '_last_vad_update'):
                if time.time() - self._last_vad_update > 1.0:  # 每秒最多更新一次
                    volume = status.get('volume', 0)
                    print(f"\n[语音检测] 音量: {volume:.3f}", end="")
                    self._last_vad_update = time.time()
            elif not hasattr(self, '_last_vad_update'):
                self._last_vad_update = time.time()
        
        elif status_type == 'optimization_metrics':
            # 更新优化性能指标
            session_id = status.get('session_id', 'unknown')
            metrics = status.get('metrics', {})
            self.stats['optimization_metrics'][session_id] = metrics
            
            # 显示关键性能指标
            latency = metrics.get('total_latency_ms', 0)
            accuracy = metrics.get('accuracy_score', 0)
            if latency > 0:
                print(f"\n[优化指标] 延迟: {latency:.1f}ms, 精度: {accuracy:.2f}")
        
        elif status_type == 'optimization_error':
            # 显示优化错误
            session_id = status.get('session_id', 'unknown')
            error_type = status.get('error_type', 'unknown')
            message = status.get('message', 'Unknown error')
            print(f"\n[优化错误] [{session_id}] {error_type}: {message}")
        
        elif status_type == 'optimization_mode_changed':
            # 显示优化模式变更
            mode = status.get('mode', 'unknown')
            print(f"\n[优化模式] 已切换到: {mode}")
    
    def _print_status(self):
        """打印当前状态"""
        if not self.recorder:
            return
            
        status = self.recorder.get_status()
        vad_status = status.get('vad_status', {})
        recognition_status = status.get('recognition_status', {})
        
        print(f"\n=== 系统状态 ===")
        print(f"录音状态: {'录音中' if status.get('is_recording') else '未录音'}")
        print(f"语音检测: {'检测到语音' if vad_status.get('is_speaking') else '静音'}")
        print(f"当前音量: {vad_status.get('volume', 0):.3f}")
        print(f"动态阈值: {vad_status.get('dynamic_threshold', 0):.3f}")
        print(f"活跃会话: {recognition_status.get('active_sessions_count', 0)}")
        
        # 统计信息
        if self.stats['start_time']:
            runtime = time.time() - self.stats['start_time']
            print(f"\n=== 统计信息 ===")
            print(f"运行时间: {runtime:.1f}秒")
            print(f"创建会话: {self.stats['sessions_created']}")
            print(f"部分结果: {self.stats['partial_results']}")
            print(f"最终结果: {self.stats['final_results']}")
            print(f"总语音时长: {self.stats['total_speech_duration']:.1f}秒")
            if runtime > 0:
                print(f"平均处理速度: {self.stats['total_speech_duration']/runtime:.2f}x实时")
            
            # 显示优化统计
            print(f"\n=== 优化统计 ===")
            print(f"当前优化模式: {self.current_optimization_mode.value}")
            if self.stats['optimization_metrics']:
                total_latency = 0
                total_accuracy = 0
                count = 0
                for session_id, metrics in self.stats['optimization_metrics'].items():
                    latency = metrics.get('total_latency_ms', 0)
                    accuracy = metrics.get('accuracy_score', 0)
                    if latency > 0:
                        total_latency += latency
                        total_accuracy += accuracy
                        count += 1
                
                if count > 0:
                    avg_latency = total_latency / count
                    avg_accuracy = total_accuracy / count
                    print(f"平均延迟: {avg_latency:.1f}ms")
                    print(f"平均精度: {avg_accuracy:.2f}")
                    print(f"优化会话数: {count}")
    
    def _print_help(self):
        """打印帮助信息"""
        print("\n=== 控制命令 ===")
        print("s - 开始/停止录音")
        print("t - 显示状态信息")
        print("d - 列出音频设备")
        print("x - 选择音频设备")
        print("o - 切换优化模式")
        print("c - 清除屏幕")
        print("h - 显示帮助")
        print("q - 退出程序")
        print("=" * 30)
    
    def _list_audio_devices(self):
        """列出音频设备"""
        try:
            # 创建临时录音器来获取设备列表
            temp_recorder = StreamingRecorder(
                chunk=1024,
                rate=16000,
                channels=1,
                username="temp_device_list"
            )
            
            devices = temp_recorder.list_audio_devices()
            print("\n=== 音频设备列表 ===")
            for device in devices:
                status = " [当前选择]" if device['index'] == self.selected_device_index else ""
                print(f"设备 {device['index']}: {device['name']}{status}")
                print(f"  输入声道: {device['channels']}")
                print(f"  默认采样率: {device['sample_rate']}")
                print()
                
            # 清理临时录音器
            del temp_recorder
            
        except Exception as e:
            print(f"获取音频设备失败: {e}")
    
    def _switch_optimization_mode(self):
        """切换优化模式"""
        modes = list(OptimizationMode)
        current_index = modes.index(self.current_optimization_mode)
        
        print(f"\n=== 优化模式切换 ===")
        print(f"当前模式: {self.current_optimization_mode.value}")
        print("\n可用模式:")
        for i, mode in enumerate(modes):
            marker = " [当前]" if mode == self.current_optimization_mode else ""
            print(f"{i + 1}. {mode.value}{marker}")
        
        try:
            choice = input("\n请选择模式编号 (回车保持当前): ").strip()
            
            if choice == "":
                print("保持当前优化模式")
                return
            
            mode_index = int(choice) - 1
            if 0 <= mode_index < len(modes):
                new_mode = modes[mode_index]
                self.current_optimization_mode = new_mode
                
                # 如果录音器存在且有优化管理器,更新模式
                if self.recorder and hasattr(self.recorder, 'optimization_manager'):
                    self.recorder.set_optimization_mode(new_mode)
                
                print(f"✅ 已切换到优化模式: {new_mode.value}")
            else:
                print(f"❌ 无效的模式编号: {choice}")
                
        except ValueError:
            print("❌ 请输入有效的数字")
        except Exception as e:
            print(f"❌ 切换优化模式时出错: {e}")
    
    def start(self):
        """启动演示应用"""
        print("流式语音识别演示应用")
        print("作者: AIfeng")
        print("时间: 2025-07-07 09:34:55")
        print("=" * 50)
        
        # 初始化组件
        print("正在初始化...")
        
        if not self._setup_asr_client():
            print("❌ ASR客户端初始化失败")
            return False
            
        if not self._setup_recorder():
            print("❌ 录音器初始化失败")
            return False
        
        print("✅ 初始化完成")
        
        self.is_running = True
        self.stats['start_time'] = time.time()
        
        self._print_help()
        
        # 主循环
        try:
            while self.is_running:
                command = input("\n请输入命令 (h=帮助): ").strip().lower()
                
                if command == 'q':
                    break
                elif command == 's':
                    if self.recorder.is_recording():
                        print("停止录音...")
                        self.recorder.stop_recording()
                        print("✅ 录音已停止")
                        
                        # 等待最终结果处理完成
                        time.sleep(0.5)
                    else:
                        print("开始录音...")
                        if self.selected_device_index is not None:
                            print(f"使用设备: {self.selected_device_index}")
                        
                        if self.recorder.start_recording(device_index=self.selected_device_index):
                            self.stats['sessions_created'] += 1
                            print("✅ 录音已开始,请说话...")
                            print("(说话时会显示实时识别结果)")
                        else:
                            print("❌ 录音启动失败")
                elif command == 't':
                    self._print_status()
                elif command == 'd':
                    self._list_audio_devices()
                elif command == 'x':
                    self._select_audio_device()
                elif command == 'o':
                    self._switch_optimization_mode()
                elif command == 'c':
                    os.system('cls' if os.name == 'nt' else 'clear')
                elif command == 'h':
                    self._print_help()
                elif command == '':
                    continue
                else:
                    print(f"未知命令: {command},输入 'h' 查看帮助")
                    
        except KeyboardInterrupt:
            print("\n程序被用户中断")
        except Exception as e:
            print(f"\n程序异常: {e}")
            logger.error(f"主循环异常: {e}", exc_info=True)
        finally:
            self._cleanup()
        
        return True
    
    def _cleanup(self):
        """清理资源"""
        print("\n正在清理资源...")
        
        if self.recorder and self.recorder.is_recording():
            print("正在停止录音...")
            self.recorder.stop_recording()
            time.sleep(0.5)  # 等待录音完全停止
            print("✅ 录音已停止")
        
        # ASR客户端由StreamingRecorder内部管理,无需手动关闭
        
        self.is_running = False
        print("✅ 资源清理完成")
        
        # 显示最终统计
        if self.stats['start_time']:
            runtime = time.time() - self.stats['start_time']
            print(f"\n=== 会话总结 ===")
            print(f"总运行时间: {runtime:.1f}秒")
            print(f"创建会话数: {self.stats['sessions_created']}")
            print(f"部分结果数: {self.stats['partial_results']}")
            print(f"最终结果数: {self.stats['final_results']}")
            print(f"总语音时长: {self.stats['total_speech_duration']:.1f}秒")
            
            if self.stats['final_results'] > 0:
                avg_confidence = sum(
                    session['final_result']['confidence'] 
                    for session in self.session_results.values() 
                    if session.get('final_result')
                ) / self.stats['final_results']
                print(f"平均置信度: {avg_confidence:.2f}")
    
    def _select_audio_device(self):
        """选择音频设备"""
        try:
            # 创建临时录音器来获取设备列表
            temp_recorder = StreamingRecorder(
                chunk=1024,
                rate=16000,
                channels=1,
                username="temp_device_select"
            )
            
            devices = temp_recorder.list_audio_devices()
            del temp_recorder
            
            if not devices:
                print("未找到可用的音频设备")
                return
            
            print("\n=== 选择音频设备 ===")
            for device in devices:
                status = " [当前选择]" if device['index'] == self.selected_device_index else ""
                print(f"{device['index']}: {device['name']}{status}")
            
            try:
                choice = input("\n请输入设备编号 (回车使用默认设备): ").strip()
                
                if choice == "":
                    self.selected_device_index = None
                    print("✅ 已选择默认音频设备")
                else:
                    device_index = int(choice)
                    # 验证设备索引是否有效
                    valid_indices = [d['index'] for d in devices]
                    if device_index in valid_indices:
                        self.selected_device_index = device_index
                        selected_device = next(d for d in devices if d['index'] == device_index)
                        print(f"✅ 已选择设备: {selected_device['name']}")
                        
                        # 如果录音器已经初始化,需要重新设置
                        if self.recorder:
                            print("正在重新初始化录音器...")
                            if self.recorder.is_recording():
                                print("停止当前录音...")
                                self.recorder.stop_recording()
                                time.sleep(0.5)  # 等待录音完全停止
                                        
                            self._setup_recorder()
                            print("✅ 录音器已更新")
                    else:
                        print(f"❌ 无效的设备编号: {device_index}")
                        
            except ValueError:
                print("❌ 请输入有效的数字")
            except Exception as e:
                print(f"❌ 选择设备时出错: {e}")
                
        except Exception as e:
            print(f"获取音频设备失败: {e}")

def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description="流式语音识别演示应用")
    parser.add_argument(
        "--config", 
        default="streaming/streaming_config.json",
        help="配置文件路径"
    )
    
    args = parser.parse_args()
    
    demo = StreamingRecognitionDemo(args.config)
    
    try:
        demo.start()
    except Exception as e:
        print(f"启动失败: {e}")
        logger.error(f"启动异常: {e}", exc_info=True)
        return 1
    
    print("\n感谢使用流式语音识别演示应用!")
    return 0

if __name__ == "__main__":
    exit(main())