streaming_recorder.py 32.6 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815
# AIfeng/2025-07-07 09:34:55
# 流式录音器 - 集成流式VAD和识别结果管理
# 核心功能:持续拼接的累积识别、智能语音分段、流式结果处理

import pyaudio
import wave
import threading
import time
import tempfile
import os
import uuid
import struct
from datetime import datetime
from typing import Optional, Callable, Dict, Any
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from .streaming_vad import StreamingVAD
from .streaming_recognition_manager import StreamingRecognitionManager
from .optimization.optimization_manager import OptimizationManager, OptimizationMode
from funasr_asr_sync import FunASRSync
from logger import get_logger

logger = get_logger("StreamingRecorder")

class StreamingRecorder:
    """流式录音器
    
    集成流式VAD和识别结果管理器,实现:
    1. 持续拼接的累积识别
    2. 智能语音分段
    3. 部分识别结果和最终识别结果的管理
    4. 异步ASR处理
    5. 实时状态通知
    """
    
    def __init__(self, 
                 chunk=1024, 
                 format=pyaudio.paInt16, 
                 channels=1, 
                 rate=16000,
                 # VAD参数
                 volume_threshold=0.03,
                 silence_duration=1.5,
                 min_speech_duration=0.5,
                 max_speech_duration=30.0,
                 pre_buffer_duration=0.5,
                 dynamic_threshold_factor=0.8,
                 partial_result_interval=2.0,
                 # 识别管理参数
                 confidence_threshold=0.6,
                 max_session_duration=60.0,
                 result_merge_window=1.0,
                 # ASR参数
                 username="streaming_user",
                 # 配置参数
                 config=None):
        """
        初始化流式录音器
        
        Args:
            chunk: 音频块大小
            format: 音频格式
            channels: 声道数
            rate: 采样率
            volume_threshold: 基础音量阈值
            silence_duration: 静音持续时间阈值(秒)
            min_speech_duration: 最小语音持续时间(秒)
            max_speech_duration: 最大语音持续时间(秒)
            pre_buffer_duration: 预缓冲时长(秒)
            dynamic_threshold_factor: 动态阈值因子
            partial_result_interval: 部分识别结果发送间隔(秒)
            confidence_threshold: 置信度阈值
            max_session_duration: 最大会话持续时间(秒)
            result_merge_window: 结果合并时间窗口(秒)
            username: 用户名
        """
        # 音频参数
        self.chunk = chunk
        self.format = format
        self.channels = channels
        self.rate = rate
        self.username = username
        
        # 音频增益参数
        recorder_config = config.get('streaming_recorder', {}) if config else {}
        self.volume_gain = recorder_config.get('audio_gain', 3.0)
        self.enable_gain = recorder_config.get('enable_audio_gain', True)
        
        logger.info(f"音频增益配置 - 启用: {self.enable_gain}, 增益倍数: {self.volume_gain}x")
        
        # 录音状态
        self.is_recording_flag = False
        self.recording_thread = None
        self.audio = None
        self.stream = None
        
        # 初始化流式VAD
        self.vad = StreamingVAD(
            sample_rate=rate,
            chunk_size=chunk,
            volume_threshold=volume_threshold,
            silence_duration=silence_duration,
            min_speech_duration=min_speech_duration,
            max_speech_duration=max_speech_duration,
            pre_buffer_duration=pre_buffer_duration,
            dynamic_threshold_factor=dynamic_threshold_factor,
            partial_result_interval=partial_result_interval
        )
        
        # 初始化识别结果管理器
        self.recognition_manager = StreamingRecognitionManager(
            confidence_threshold=confidence_threshold,
            max_session_duration=max_session_duration,
            result_merge_window=result_merge_window
        )
        
        # 初始化优化管理器
        self.optimization_manager = OptimizationManager()
        self.optimization_manager.set_optimization_mode(OptimizationMode.BALANCED)
        
        # ASR客户端
        self.asr_client = FunASRSync(username)
        
        # 当前会话
        self.current_session_id = None
        self.pending_asr_requests = {}  # 跟踪待处理的ASR请求
        
        # 设置VAD回调
        self.vad.on_speech_start = self._on_speech_start
        self.vad.on_speech_continue = self._on_speech_continue
        self.vad.on_speech_end = self._on_speech_end
        self.vad.on_partial_result_ready = self._on_partial_result_ready
        
        # 设置识别管理器回调
        self.recognition_manager.on_partial_result = self._on_partial_recognition_result
        self.recognition_manager.on_final_result = self._on_final_recognition_result
        self.recognition_manager.on_result_updated = self._on_recognition_result_updated
        self.recognition_manager.on_session_complete = self._on_session_complete
        
        # 设置优化管理器回调
        self.optimization_manager.register_result_callback(self._on_optimization_result)
        self.optimization_manager.register_error_callback(self._on_optimization_error)
        self.optimization_manager.register_metrics_callback(self._on_optimization_metrics)
        
        # 外部回调函数
        self.on_speech_detected: Optional[Callable] = None
        self.on_partial_result: Optional[Callable] = None
        self.on_final_result: Optional[Callable] = None
        self.on_session_complete: Optional[Callable] = None
        self.on_status_update: Optional[Callable] = None
        
        logger.info(f"StreamingRecorder初始化完成 - 用户:{username}")
    
    def _on_speech_start(self, vad_result: Dict[str, Any]):
        """VAD检测到语音开始"""
        logger.info("检测到语音开始,创建新会话")
        
        # 创建新的识别会话
        self.current_session_id = str(uuid.uuid4())
        self.recognition_manager.create_session(
            self.current_session_id,
            metadata={
                'start_time': time.time(),
                'speech_duration': vad_result['speech_duration'],
                'volume': vad_result['volume']
            }
        )
        
        # 通知外部
        if self.on_speech_detected:
            self.on_speech_detected('start', vad_result)
        
        # 发送初始音频进行识别
        self._send_audio_for_recognition(vad_result['audio_buffer'], 'partial')
    
    def _on_speech_continue(self, vad_result: Dict[str, Any]):
        """VAD检测到语音继续"""
        logger.debug(f"语音继续 - 时长:{vad_result['speech_duration']:.2f}s")
        
        # 通知外部
        if self.on_speech_detected:
            self.on_speech_detected('continue', vad_result)
    
    def _on_speech_end(self, vad_result: Dict[str, Any]):
        """VAD检测到语音结束"""
        speech_duration = vad_result['speech_duration']
        audio_buffer_size = len(vad_result['audio_buffer']) if vad_result['audio_buffer'] else 0
        
        logger.info(f"检测到语音结束 - 时长:{speech_duration:.2f}s, 音频缓冲区大小:{audio_buffer_size}帧")
        
        # 详细调试信息
        if audio_buffer_size == 0:
            logger.warning("语音结束但音频缓冲区为空,跳过处理")
            return
            
        if speech_duration < 0.1:
            logger.warning(f"语音时长过短({speech_duration:.2f}s),可能是噪音")
        
        # 发送最终音频进行识别
        logger.debug(f"准备发送最终音频进行识别 - 会话ID:{self.current_session_id}")
        self._send_audio_for_recognition(vad_result['audio_buffer'], 'final')
        
        # 通知外部
        if self.on_speech_detected:
            self.on_speech_detected('end', vad_result)
    
    def _on_partial_result_ready(self, vad_result: Dict[str, Any]):
        """VAD准备发送部分识别结果"""
        logger.debug(f"准备发送部分识别结果 - 时长:{vad_result['speech_duration']:.2f}s")
        
        # 发送部分音频进行识别
        self._send_audio_for_recognition(vad_result['audio_buffer'], 'partial')
    
    def _send_audio_for_recognition(self, audio_buffer: list, result_type: str):
        """发送音频数据进行识别(集成优化处理)"""
        if not audio_buffer:
            logger.warning(f"音频缓冲区为空,无法发送{result_type}识别请求")
            return
            
        if not self.current_session_id:
            logger.warning(f"当前会话ID为空,无法发送{result_type}识别请求")
            return
        
        try:
            # 将音频数据转换为字节格式
            audio_bytes = b''.join(audio_buffer)
            audio_size_bytes = len(audio_bytes)
            audio_duration = audio_size_bytes / (self.rate * 2)  # 16位音频,2字节每样本
            
            logger.info(f"准备发送{result_type}音频识别 - 会话:{self.current_session_id}, 大小:{audio_size_bytes}字节, 时长:{audio_duration:.2f}s")
            
            # 通过优化管理器处理音频
            processing_context = {
                'session_id': self.current_session_id,
                'audio_data': audio_bytes,
                'sample_rate': self.rate,
                'timestamp': time.time(),
                'result_type': result_type,
                'metadata': {
                    'channels': self.channels,
                    'format': self.format,
                    'chunk_size': self.chunk,
                    'audio_duration': audio_duration,
                    'audio_size_bytes': audio_size_bytes
                }
            }
            
            # 异步处理音频以避免阻塞
            logger.debug(f"启动异步音频处理线程 - {result_type}")
            threading.Thread(
                target=self._process_audio_with_optimization,
                args=(processing_context,),
                daemon=True
            ).start()
            
        except Exception as e:
            logger.error(f"音频处理失败: {e}", exc_info=True)
    
    def _process_audio_with_optimization(self, context: Dict[str, Any]):
        """通过优化管理器处理音频"""
        session_id = context['session_id']
        result_type = context['result_type']
        audio_size = len(context['audio_data'])
        
        logger.debug(f"开始优化音频处理 - 会话:{session_id}, 类型:{result_type}, 大小:{audio_size}字节")
        
        try:
            # 使用优化管理器处理音频(异步处理,结果通过回调返回)
            logger.debug(f"调用优化管理器处理音频 - {session_id}")
            success = self.optimization_manager.process_audio(
                session_id=context['session_id'],
                audio_data=context['audio_data'],
                sample_rate=context['sample_rate']
            )
            
            logger.info(f"优化管理器处理提交结果: {success} - 会话:{session_id}")
            
            # OptimizationManager.process_audio()是异步的,返回布尔值表示是否成功提交处理
            # 实际结果会通过注册的回调函数返回
            if not success:
                # 如果提交处理失败,回退到原始处理方式
                logger.warning(f"优化管理器处理提交失败,回退到原始处理 - 会话:{session_id}")
                self._fallback_audio_processing(context)
            else:
                # 为了确保音频文件被保存用于调试,同时执行回退处理
                logger.info(f"优化处理成功提交,同时执行回退处理以保存音频文件 - 会话:{session_id}")
                self._fallback_audio_processing(context)
                
        except Exception as e:
            logger.error(f"优化音频处理失败: {e},回退到原始处理 - 会话:{session_id}", exc_info=True)
            self._fallback_audio_processing(context)
    
    def _fallback_audio_processing(self, context: Dict[str, Any]):
        """回退的音频处理方式"""
        session_id = context['session_id']
        result_type = context['result_type']
        audio_size = len(context['audio_data'])
        
        logger.info(f"开始回退音频处理 - 会话:{session_id}, 类型:{result_type}, 大小:{audio_size}字节")
        
        try:
            # 创建带时间戳的音频文件用于测试
            timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]  # 精确到毫秒
            temp_filename = os.path.join('cache_data', f'audio_{result_type}_{timestamp}.wav')
            
            logger.debug(f"准备保存音频文件: {temp_filename}")
            
            # 确保目录存在
            cache_dir = 'cache_data'
            if not os.path.exists(cache_dir):
                logger.info(f"创建缓存目录: {cache_dir}")
                os.makedirs(cache_dir, exist_ok=True)
            else:
                logger.debug(f"缓存目录已存在: {cache_dir}")
            
            # 保存音频
            logger.debug(f"开始写入音频文件: {temp_filename}")
            with wave.open(temp_filename, 'wb') as wf:
                wf.setnchannels(self.channels)
                wf.setsampwidth(pyaudio.get_sample_size(self.format))
                wf.setframerate(self.rate)
                wf.writeframes(context['audio_data'])
            
            # 验证文件是否成功创建
            if os.path.exists(temp_filename):
                file_size = os.path.getsize(temp_filename)
                logger.info(f"音频文件保存成功: {temp_filename}, 文件大小:{file_size}字节")
            else:
                logger.error(f"音频文件保存失败: {temp_filename}")
                return
            
            # 计算音频时长
            audio_duration = len(context['audio_data']) / (self.rate * 2)  # 16位音频,2字节每样本
            
            # 生成请求ID
            request_id = str(uuid.uuid4())
            self.pending_asr_requests[request_id] = {
                'session_id': session_id,
                'result_type': result_type,
                'audio_file': temp_filename,
                'audio_duration': audio_duration,
                'timestamp': time.time()
            }
            
            logger.info(f"发送音频进行{result_type}识别: {temp_filename}, 时长:{audio_duration:.2f}s, 请求ID:{request_id}")
            
            # 异步发送给ASR
            logger.debug(f"启动ASR处理线程 - 请求ID:{request_id}")
            threading.Thread(
                target=self._process_asr_request,
                args=(request_id, temp_filename),
                daemon=True
            ).start()
            
        except Exception as e:
            logger.error(f"回退音频处理失败 - 会话:{session_id}: {e}", exc_info=True)
    
    def _on_optimization_result(self, result: Dict[str, Any]):
        """处理优化管理器的结果回调"""
        try:
            session_id = result.get('session_id')
            text = result.get('text', '')
            confidence = result.get('confidence', 0.0)
            result_type = result.get('result_type', 'partial')
            
            logger.info(f"优化处理结果 [{session_id}] ({result_type}): {text}")
            
            # 通知外部回调
            if result_type == 'partial' and self.on_partial_result:
                self.on_partial_result(session_id, text, confidence)
            elif result_type == 'final' and self.on_final_result:
                self.on_final_result(session_id, text, confidence)
                
        except Exception as e:
            logger.error(f"处理优化结果失败: {e}")
    
    def _on_optimization_error(self, session_id: str, error: Exception):
        """处理优化管理器的错误回调"""
        try:
            error_msg = str(error)
            error_type = type(error).__name__
            
            logger.error(f"优化处理错误 [{session_id}] ({error_type}): {error_msg}")
            
            # 通知状态更新
            if self.on_status_update:
                self.on_status_update({
                    'type': 'optimization_error',
                    'session_id': session_id,
                    'error_type': error_type,
                    'message': error_msg
                })
                
        except Exception as e:
            logger.error(f"处理优化错误失败: {e}")
    
    def _on_optimization_metrics(self, metrics: Dict[str, Any]):
        """处理优化管理器的性能指标回调"""
        try:
            session_id = metrics.get('session_id', 'unknown')
            latency = metrics.get('total_latency_ms', 0.0)
            accuracy = metrics.get('accuracy_score', 0.0)
            
            logger.debug(f"优化性能指标 [{session_id}]: 延迟={latency:.1f}ms, 精度={accuracy:.2f}")
            
            # 通知状态更新
            if self.on_status_update:
                self.on_status_update({
                    'type': 'optimization_metrics',
                    'session_id': session_id,
                    'metrics': metrics
                })
                
        except Exception as e:
            logger.error(f"处理优化指标失败: {e}")
    
    def set_optimization_mode(self, mode: OptimizationMode):
        """设置优化模式"""
        try:
            self.optimization_manager.set_optimization_mode(mode)
            logger.info(f"优化模式已设置为: {mode.value}")
            
            # 通知状态更新
            if self.on_status_update:
                self.on_status_update({
                    'type': 'optimization_mode_changed',
                    'mode': mode.value
                })
                
        except Exception as e:
            logger.error(f"设置优化模式失败: {e}")
    
    def get_optimization_metrics(self) -> Dict[str, Any]:
        """获取优化性能指标"""
        try:
            return self.optimization_manager.get_performance_metrics()
        except Exception as e:
            logger.error(f"获取优化指标失败: {e}")
            return {}
    
    def _process_asr_request(self, request_id: str, audio_file: str):
        """处理ASR识别请求"""
        logger.info(f"开始处理ASR请求 - 请求ID:{request_id}, 音频文件:{audio_file}")
        
        try:
            # 确保音频文件存在
            import os
            if not os.path.exists(audio_file):
                logger.error(f"音频文件不存在: {audio_file}")
                self._handle_asr_error(request_id, f"音频文件不存在: {audio_file}")
                return
            
            # 验证音频文件大小
            file_size = os.path.getsize(audio_file)
            logger.debug(f"音频文件验证通过 - 大小:{file_size}字节")
            
            # 检查ASR客户端连接状态
            if not hasattr(self.asr_client, 'is_connected') or not self.asr_client.is_connected():
                logger.warning(f"ASR客户端未连接,尝试重新连接")
                try:
                    self.asr_client.start()
                    logger.info(f"ASR客户端重新连接成功")
                except Exception as conn_e:
                    logger.error(f"ASR客户端连接失败: {conn_e}")
                    self._handle_asr_error(request_id, f"ASR客户端连接失败: {conn_e}")
                    return
            
            # 重置ASR客户端状态
            self.asr_client.done = False
            self.asr_client.finalResults = None
            
            # 使用FunASR期望的URL格式发送音频文件
            logger.info(f"发送音频文件到FunASR: {audio_file}")
            self.asr_client.send_url(audio_file)
            
            # 等待识别结果(这里需要根据实际ASR客户端实现调整)
            # 由于FunASRSync是异步的,结果会通过回调返回
            # 这里我们需要监听结果并关联到请求
            
            # 模拟等待结果的过程
            start_time = time.time()
            timeout = 15.0  # 增加到15秒超时
            check_interval = 0.2  # 增加检查间隔
            
            logger.debug(f"开始等待ASR识别结果 - 超时:{timeout}s")
            
            while time.time() - start_time < timeout:
                elapsed = time.time() - start_time
                
                # 检查ASR客户端状态
                if hasattr(self.asr_client, 'done') and self.asr_client.done:
                    if hasattr(self.asr_client, 'finalResults') and self.asr_client.finalResults:
                        # 获取识别结果
                        result_text = self.asr_client.finalResults
                        logger.info(f"收到ASR识别结果 [{request_id}]: {result_text}")
                        
                        # 处理识别结果
                        self._handle_asr_result(request_id, result_text)
                        return
                    else:
                        logger.debug(f"ASR处理完成但无识别结果 - 请求ID:{request_id}, 耗时:{elapsed:.2f}s")
                
                # 定期输出等待状态
                if int(elapsed) % 3 == 0 and elapsed > 0:
                    logger.debug(f"等待ASR识别结果中... 请求ID:{request_id}, 已等待:{elapsed:.1f}s")
                
                time.sleep(check_interval)
            else:
                logger.warning(f"ASR识别超时 - 请求ID:{request_id}, 超时时间:{timeout}s")
                self._handle_asr_timeout(request_id)
            
        except Exception as e:
            logger.error(f"处理ASR请求失败 [{request_id}]: {e}", exc_info=True)
            self._handle_asr_error(request_id, str(e))
        finally:
            # 测试阶段保留临时文件用于分析
            logger.debug(f"ASR请求处理完成 - 请求ID:{request_id}")
            pass
    
    def _handle_asr_result(self, request_id: str, result_text: str):
        """处理ASR识别结果"""
        if request_id not in self.pending_asr_requests:
            logger.warning(f"未找到ASR请求: {request_id}")
            return
        
        request_info = self.pending_asr_requests[request_id]
        session_id = request_info['session_id']
        result_type = request_info['result_type']
        audio_duration = request_info['audio_duration']
        
        logger.info(f"收到ASR结果 [{result_type}]: {result_text}")
        
        # 添加到识别结果管理器
        if result_type == 'partial':
            self.recognition_manager.add_partial_result(
                session_id, result_text, confidence=0.8, audio_duration=audio_duration
            )
        else:  # final
            self.recognition_manager.add_final_result(
                session_id, result_text, confidence=0.9, audio_duration=audio_duration
            )
            
            # 如果是最终结果,完成会话
            self.recognition_manager.complete_session(session_id)
            self.current_session_id = None
        
        # 清理请求
        del self.pending_asr_requests[request_id]
    
    def _handle_asr_timeout(self, request_id: str):
        """处理ASR超时"""
        if request_id in self.pending_asr_requests:
            logger.warning(f"ASR请求超时: {request_id}")
            del self.pending_asr_requests[request_id]
    
    def _handle_asr_error(self, request_id: str, error_msg: str):
        """处理ASR错误"""
        if request_id in self.pending_asr_requests:
            logger.error(f"ASR请求错误 [{request_id}]: {error_msg}")
            del self.pending_asr_requests[request_id]
    
    def _on_partial_recognition_result(self, session_id: str, result):
        """处理部分识别结果"""
        logger.debug(f"部分识别结果 [{session_id}]: {result.text}")
        
        if self.on_partial_result:
            self.on_partial_result(session_id, result.text, result.confidence)
    
    def _on_final_recognition_result(self, session_id: str, result):
        """处理最终识别结果"""
        logger.info(f"最终识别结果 [{session_id}]: {result.text}")
        
        if self.on_final_result:
            self.on_final_result(session_id, result.text, result.confidence)
    
    def _on_recognition_result_updated(self, session_id: str, merged_text: str, update_type: str):
        """处理识别结果更新"""
        logger.debug(f"识别结果更新 [{session_id}] ({update_type}): {merged_text}")
        
        if self.on_status_update:
            self.on_status_update({
                'type': 'result_update',
                'session_id': session_id,
                'text': merged_text,
                'update_type': update_type
            })
    
    def _on_session_complete(self, session_id: str, final_text: str):
        """处理会话完成"""
        logger.info(f"会话完成 [{session_id}]: {final_text}")
        
        if self.on_session_complete:
            self.on_session_complete(session_id, final_text)
    
    def _apply_audio_gain(self, audio_data: bytes) -> bytes:
        """应用音频增益处理"""
        if not self.enable_gain or self.volume_gain == 1.0:
            return audio_data
        
        try:
            # 将字节数据转换为16位整数数组
            count = len(audio_data) // 2
            if count == 0:
                return audio_data
            
            format_str = f"{count}h"
            shorts = struct.unpack(format_str, audio_data)
            
            # 应用音量增益并防止溢出
            amplified_shorts = []
            for sample in shorts:
                amplified_sample = int(sample * self.volume_gain)
                # 限制在16位整数范围内
                amplified_sample = max(-32768, min(32767, amplified_sample))
                amplified_shorts.append(amplified_sample)
            
            # 转换回字节数据
            return struct.pack(format_str, *amplified_shorts)
            
        except Exception as e:
            logger.warning(f"音频增益处理失败: {e},使用原始音频")
            return audio_data
    
    def _recording_loop(self):
        """录音主循环"""
        logger.info("开始流式录音循环")
        
        while self.is_recording_flag:
            try:
                data = self.stream.read(self.chunk, exception_on_overflow=False)
                
                # 应用音频增益处理
                if self.enable_gain:
                    data = self._apply_audio_gain(data)
                
                # 通过VAD处理音频帧
                vad_result = self.vad.process_audio_frame(data)
                
                # 更新状态
                if self.on_status_update:
                    self.on_status_update({
                        'type': 'vad_status',
                        'is_speaking': vad_result['is_speaking'],
                        'volume': vad_result['volume'],
                        'threshold': vad_result['threshold'],
                        'speech_duration': vad_result['speech_duration'],
                        'silence_duration': vad_result['silence_duration']
                    })
                
            except Exception as e:
                logger.error(f"录音循环错误: {e}")
                break
        
        logger.info("流式录音循环结束")
    
    def start_recording(self, device_index=None):
        """开始录音"""
        if self.is_recording_flag:
            logger.warning("录音已在进行中")
            return False
        
        try:
            self.audio = pyaudio.PyAudio()
            
            # 创建音频流
            self.stream = self.audio.open(
                format=self.format,
                channels=self.channels,
                rate=self.rate,
                input=True,
                input_device_index=device_index,
                frames_per_buffer=self.chunk
            )
            
            # 重置状态
            self.is_recording_flag = True
            self.current_session_id = None
            self.pending_asr_requests.clear()
            
            # 重置VAD和识别管理器
            self.vad.reset()
            self.recognition_manager.reset()
            
            # 启动ASR客户端
            if not self.asr_client.is_connected():
                self.asr_client.start()
            
            # 启动录音线程
            self.recording_thread = threading.Thread(target=self._recording_loop)
            self.recording_thread.start()
            
            logger.info("流式录音开始")
            return True
            
        except Exception as e:
            logger.error(f"启动流式录音失败: {e}")
            self.stop_recording()
            return False
    
    def stop_recording(self):
        """停止录音"""
        logger.info("停止流式录音")
        
        self.is_recording_flag = False
        
        # 强制结束当前语音片段
        if self.current_session_id:
            final_result = self.vad.force_end_speech()
            if final_result:
                self._send_audio_for_recognition(final_result['audio_buffer'], 'final')
        
        if self.recording_thread:
            self.recording_thread.join(timeout=2)
        
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
            self.stream = None
        
        if self.audio:
            self.audio.terminate()
            self.audio = None
        
        # 结束ASR客户端
        self.asr_client.end()
        
        logger.info("流式录音已停止")
    
    def is_recording(self):
        """检查是否正在录音"""
        return self.is_recording_flag
    
    def get_status(self):
        """获取录音器状态"""
        vad_status = self.vad.get_status()
        recognition_status = self.recognition_manager.get_status()
        
        return {
            'is_recording': self.is_recording_flag,
            'current_session_id': self.current_session_id,
            'pending_asr_requests': len(self.pending_asr_requests),
            'asr_connected': self.asr_client.is_connected(),
            'vad_status': vad_status,
            'recognition_status': recognition_status
        }
    
    def get_current_result(self):
        """获取当前识别结果"""
        if self.current_session_id:
            return self.recognition_manager.get_merged_result(self.current_session_id)
        return ""
    
    def list_audio_devices(self):
        """列出可用的音频设备"""
        audio = pyaudio.PyAudio()
        devices = []
        
        for i in range(audio.get_device_count()):
            device_info = audio.get_device_info_by_index(i)
            if device_info['maxInputChannels'] > 0:
                devices.append({
                    'index': i,
                    'name': device_info['name'],
                    'channels': device_info['maxInputChannels'],
                    'sample_rate': device_info['defaultSampleRate']
                })
        
        audio.terminate()
        return devices

if __name__ == "__main__":
    # 测试代码
    def on_partial_result(session_id, text, confidence):
        print(f"[部分结果] {text} (置信度: {confidence:.2f})")
    
    def on_final_result(session_id, text, confidence):
        print(f"[最终结果] {text} (置信度: {confidence:.2f})")
    
    def on_session_complete(session_id, final_text):
        print(f"[会话完成] {final_text}")
    
    def on_status_update(status):
        if status['type'] == 'vad_status' and status['is_speaking']:
            print(f"[VAD] 语音中 - 音量:{status['volume']:.3f}, 时长:{status['speech_duration']:.1f}s")
    
    recorder = StreamingRecorder(
        volume_threshold=0.03,
        silence_duration=1.5,
        min_speech_duration=0.5,
        max_speech_duration=30.0,
        partial_result_interval=2.0
    )
    
    # 设置回调
    recorder.on_partial_result = on_partial_result
    recorder.on_final_result = on_final_result
    recorder.on_session_complete = on_session_complete
    recorder.on_status_update = on_status_update
    
    print("可用音频设备:")
    devices = recorder.list_audio_devices()
    for device in devices:
        print(f"  {device['index']}: {device['name']}")
    
    print("\n按Enter开始流式录音...")
    input()
    
    recorder.start_recording()
    
    print("流式录音中... 按Enter停止")
    input()
    
    recorder.stop_recording()
    print("流式录音结束")