server_audio_recorder_async_backup.py 17.1 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-07-01 16:51:01
服务端音频录音模块
基于Fay架构的服务端录音实现,支持实时VAD和流式传输
"""

import asyncio
import threading
import time
import queue
import numpy as np
from typing import Optional, Callable, Dict, Any
import json
import logging

try:
    import pyaudio
    PYAUDIO_AVAILABLE = True
except ImportError:
    PYAUDIO_AVAILABLE = False
    print("警告: pyaudio未安装,服务端录音功能不可用")

import util
from funasr_asr import FunASRClient

class VoiceActivityDetector:
    """语音活动检测器"""
    
    def __init__(self, 
                 volume_threshold: float = 0.01,
                 silence_duration: float = 1.0,
                 speech_duration: float = 0.3):
        """
        初始化VAD检测器
        
        Args:
            volume_threshold: 音量阈值
            silence_duration: 静音持续时间阈值(秒)
            speech_duration: 语音持续时间阈值(秒)
        """
        self.volume_threshold = volume_threshold
        self.silence_duration = silence_duration
        self.speech_duration = speech_duration
        
        # 状态变量
        self.is_speaking = False
        self.silence_start_time = None
        self.speech_start_time = None
        self.last_volume = 0.0
        
        # 历史音频缓存
        self.audio_buffer = []
        self.buffer_max_size = 32  # 保留最近32帧音频
        
        # 动态阈值调整
        self.background_noise_level = 0.001
        self.noise_samples = []
        self.noise_sample_count = 100
        
    def calculate_volume(self, audio_data: np.ndarray) -> float:
        """计算音频音量(RMS)"""
        if len(audio_data) == 0:
            return 0.0
        return np.sqrt(np.mean(audio_data.astype(np.float32) ** 2))
    
    def update_background_noise(self, volume: float):
        """更新背景噪音水平"""
        if not self.is_speaking:
            self.noise_samples.append(volume)
            if len(self.noise_samples) > self.noise_sample_count:
                self.noise_samples.pop(0)
            
            if len(self.noise_samples) >= 10:
                self.background_noise_level = np.mean(self.noise_samples) * 2.0
                self.volume_threshold = max(0.005, self.background_noise_level)
    
    def detect(self, audio_data: np.ndarray) -> Dict[str, Any]:
        """检测语音活动
        
        Returns:
            {
                'is_speech': bool,
                'is_sentence_start': bool,
                'is_sentence_end': bool,
                'volume': float,
                'buffered_audio': Optional[np.ndarray]
            }
        """
        current_time = time.time()
        volume = self.calculate_volume(audio_data)
        self.last_volume = volume
        
        # 更新背景噪音
        self.update_background_noise(volume)
        
        # 添加到音频缓存
        self.audio_buffer.append(audio_data)
        if len(self.audio_buffer) > self.buffer_max_size:
            self.audio_buffer.pop(0)
        
        is_speech = volume > self.volume_threshold
        is_sentence_start = False
        is_sentence_end = False
        buffered_audio = None
        
        if is_speech:
            if not self.is_speaking:
                # 语音开始
                self.is_speaking = True
                self.speech_start_time = current_time
                is_sentence_start = True
                
                # 返回缓存的音频(包含语音开始前的部分)
                if len(self.audio_buffer) > 1:
                    buffered_audio = np.concatenate(self.audio_buffer)
                
                util.log(1, f"检测到语音开始,音量: {volume:.4f}")
            
            self.silence_start_time = None
        else:
            if self.is_speaking:
                if self.silence_start_time is None:
                    self.silence_start_time = current_time
                elif current_time - self.silence_start_time >= self.silence_duration:
                    # 语音结束
                    self.is_speaking = False
                    is_sentence_end = True
                    speech_duration = current_time - self.speech_start_time if self.speech_start_time else 0
                    
                    util.log(1, f"检测到语音结束,持续时间: {speech_duration:.2f}秒")
                    
                    self.speech_start_time = None
                    self.silence_start_time = None
        
        return {
            'is_speech': is_speech,
            'is_sentence_start': is_sentence_start,
            'is_sentence_end': is_sentence_end,
            'volume': volume,
            'buffered_audio': buffered_audio,
            'threshold': self.volume_threshold
        }

class ServerAudioRecorder:
    """服务端音频录音器"""
    
    def __init__(self, 
                 sample_rate: int = 16000,
                 channels: int = 1,
                 chunk_size: int = 1024,
                 device_index: Optional[int] = None):
        """
        初始化服务端录音器
        
        Args:
            sample_rate: 采样率
            channels: 声道数
            chunk_size: 音频块大小
            device_index: 音频设备索引
        """
        if not PYAUDIO_AVAILABLE:
            raise ImportError("pyaudio未安装,无法使用服务端录音功能")
        
        self.sample_rate = sample_rate
        self.channels = channels
        self.chunk_size = chunk_size
        self.device_index = device_index
        
        # PyAudio对象
        self.pyaudio_instance = None
        self.audio_stream = None
        
        # 录音状态
        self.is_recording = False
        self.recording_thread = None
        self.stop_event = threading.Event()
        
        # 音频队列
        self.audio_queue = queue.Queue(maxsize=100)
        
        # VAD检测器
        self.vad = VoiceActivityDetector()
        
        # ASR客户端
        self.asr_client = None
        
        # 回调函数
        self.on_speech_start = None
        self.on_speech_end = None
        self.on_audio_data = None
        self.on_recognition_result = None
        
        util.log(1, f"服务端录音器初始化完成: {sample_rate}Hz, {channels}ch, chunk={chunk_size}")
    
    def list_audio_devices(self) -> list:
        """列出可用的音频设备"""
        if not PYAUDIO_AVAILABLE:
            return []
        
        devices = []
        p = pyaudio.PyAudio()
        
        try:
            for i in range(p.get_device_count()):
                device_info = p.get_device_info_by_index(i)
                if device_info['maxInputChannels'] > 0:
                    devices.append({
                        'index': i,
                        'name': device_info['name'],
                        'channels': device_info['maxInputChannels'],
                        'sample_rate': device_info['defaultSampleRate']
                    })
        finally:
            p.terminate()
        
        return devices
    
    def set_callbacks(self, 
                     on_speech_start: Optional[Callable] = None,
                     on_speech_end: Optional[Callable] = None,
                     on_audio_data: Optional[Callable] = None,
                     on_recognition_result: Optional[Callable] = None):
        """设置回调函数"""
        self.on_speech_start = on_speech_start
        self.on_speech_end = on_speech_end
        self.on_audio_data = on_audio_data
        self.on_recognition_result = on_recognition_result
    
    def connect_asr(self, asr_client: FunASRClient):
        """连接ASR客户端"""
        self.asr_client = asr_client
        if asr_client:
            asr_client.set_result_callback(self._on_asr_result)
            util.log(1, "ASR客户端已连接")
    
    def _on_asr_result(self, result: str):
        """ASR识别结果回调"""
        if self.on_recognition_result:
            self.on_recognition_result(result)
    
    def start_recording(self) -> bool:
        """开始录音"""
        if self.is_recording:
            util.log(2, "录音已在进行中")
            return True
        
        try:
            # 初始化PyAudio
            self.pyaudio_instance = pyaudio.PyAudio()
            
            # 打开音频流
            self.audio_stream = self.pyaudio_instance.open(
                format=pyaudio.paInt16,
                channels=self.channels,
                rate=self.sample_rate,
                input=True,
                input_device_index=self.device_index,
                frames_per_buffer=self.chunk_size
            )
            
            # 启动录音线程
            self.is_recording = True
            self.stop_event.clear()
            self.recording_thread = threading.Thread(target=self._recording_loop, daemon=True)
            self.recording_thread.start()
            
            # 启动ASR客户端
            if self.asr_client:
                self.asr_client.start()
                self.asr_client.start_recognition()
            
            util.log(1, "服务端录音已开始")
            return True
            
        except Exception as e:
            util.log(3, f"启动录音失败: {e}")
            self.stop_recording()
            return False
    
    def stop_recording(self):
        """停止录音"""
        if not self.is_recording:
            return
        
        util.log(1, "正在停止服务端录音...")
        
        # 设置停止标志
        self.is_recording = False
        self.stop_event.set()
        
        # 等待录音线程结束
        if self.recording_thread and self.recording_thread.is_alive():
            self.recording_thread.join(timeout=2)
        
        # 关闭音频流
        if self.audio_stream:
            try:
                self.audio_stream.stop_stream()
                self.audio_stream.close()
            except Exception as e:
                util.log(2, f"关闭音频流时出错: {e}")
            finally:
                self.audio_stream = None
        
        # 关闭PyAudio
        if self.pyaudio_instance:
            try:
                self.pyaudio_instance.terminate()
            except Exception as e:
                util.log(2, f"关闭PyAudio时出错: {e}")
            finally:
                self.pyaudio_instance = None
        
        # 停止ASR客户端
        if self.asr_client:
            self.asr_client.stop_recognition()
        
        util.log(1, "服务端录音已停止")
    
    def _recording_loop(self):
        """录音循环"""
        util.log(1, "录音线程已启动")
        
        try:
            while self.is_recording and not self.stop_event.is_set():
                try:
                    # 读取音频数据
                    audio_data = self.audio_stream.read(
                        self.chunk_size, 
                        exception_on_overflow=False
                    )
                    
                    # 转换为numpy数组
                    audio_array = np.frombuffer(audio_data, dtype=np.int16)
                    
                    # VAD检测
                    vad_result = self.vad.detect(audio_array)
                    
                    # 处理VAD结果
                    if vad_result['is_sentence_start']:
                        if self.on_speech_start:
                            self.on_speech_start()
                        
                        # 发送缓存的音频数据
                        if vad_result['buffered_audio'] is not None and self.asr_client:
                            buffered_bytes = bytes(vad_result['buffered_audio'].astype(np.int16).tobytes())  # Fix BufferError: memoryview has 1 exported buffer
                            self.asr_client.send_audio(buffered_bytes)
                    
                    # 发送实时音频数据
                    if vad_result['is_speech'] and self.asr_client:
                        self.asr_client.send_audio(audio_data)
                    
                    if vad_result['is_sentence_end']:
                        if self.on_speech_end:
                            self.on_speech_end()
                    
                    # 调用音频数据回调
                    if self.on_audio_data:
                        self.on_audio_data(audio_array, vad_result)
                    
                    # 将音频数据放入队列(用于其他处理)
                    try:
                        self.audio_queue.put_nowait({
                            'audio': audio_array,
                            'vad': vad_result,
                            'timestamp': time.time()
                        })
                    except queue.Full:
                        # 队列满了,丢弃最旧的数据
                        try:
                            self.audio_queue.get_nowait()
                            self.audio_queue.put_nowait({
                                'audio': audio_array,
                                'vad': vad_result,
                                'timestamp': time.time()
                            })
                        except queue.Empty:
                            pass
                    
                except Exception as e:
                    if self.is_recording:
                        util.log(3, f"录音循环中出错: {e}")
                    break
        
        except Exception as e:
            util.log(3, f"录音线程异常: {e}")
        
        util.log(1, "录音线程已结束")
    
    def get_audio_data(self, timeout: float = 0.1) -> Optional[Dict]:
        """获取音频数据
        
        Returns:
            包含音频数据和VAD结果的字典,或None
        """
        try:
            return self.audio_queue.get(timeout=timeout)
        except queue.Empty:
            return None
    
    def get_status(self) -> Dict[str, Any]:
        """获取录音状态"""
        return {
            'is_recording': self.is_recording,
            'sample_rate': self.sample_rate,
            'channels': self.channels,
            'chunk_size': self.chunk_size,
            'device_index': self.device_index,
            'queue_size': self.audio_queue.qsize(),
            'volume_threshold': self.vad.volume_threshold,
            'last_volume': self.vad.last_volume,
            'is_speaking': self.vad.is_speaking,
            'silence_timeout': getattr(self.vad, 'silence_duration', 1.0),
            'speech_timeout': getattr(self.vad, 'speech_duration', 0.3),
            'pyaudio_available': PYAUDIO_AVAILABLE,
            'recorder_created': True
        }
    
    def __del__(self):
        """析构函数"""
        self.stop_recording()

# 全局录音器实例
_global_recorder = None

def get_global_recorder() -> Optional[ServerAudioRecorder]:
    """获取全局录音器实例"""
    return _global_recorder

def create_global_recorder(**kwargs) -> ServerAudioRecorder:
    """创建全局录音器实例"""
    global _global_recorder
    if _global_recorder:
        _global_recorder.stop_recording()
    
    _global_recorder = ServerAudioRecorder(**kwargs)
    return _global_recorder

def destroy_global_recorder():
    """销毁全局录音器实例"""
    global _global_recorder
    if _global_recorder:
        _global_recorder.stop_recording()
        _global_recorder = None

if __name__ == "__main__":
    # 测试代码
    import sys
    
    def test_recorder():
        """测试录音器"""
        print("=== 服务端录音器测试 ===")
        
        if not PYAUDIO_AVAILABLE:
            print("错误: pyaudio未安装")
            return
        
        # 列出音频设备
        recorder = ServerAudioRecorder()
        devices = recorder.list_audio_devices()
        print(f"可用音频设备 ({len(devices)}个):")
        for device in devices:
            print(f"  {device['index']}: {device['name']} ({device['channels']}ch, {device['sample_rate']}Hz)")
        
        # 设置回调
        def on_speech_start():
            print("🎤 语音开始")
        
        def on_speech_end():
            print("⏹️ 语音结束")
        
        def on_audio_data(audio, vad_result):
            if vad_result['is_speech']:
                print(f"🔊 音量: {vad_result['volume']:.4f}, 阈值: {vad_result['threshold']:.4f}")
        
        recorder.set_callbacks(
            on_speech_start=on_speech_start,
            on_speech_end=on_speech_end,
            on_audio_data=on_audio_data
        )
        
        # 开始录音
        print("\n开始录音测试(按Ctrl+C停止)...")
        if recorder.start_recording():
            try:
                while True:
                    time.sleep(0.1)
                    status = recorder.get_status()
                    if status['is_speaking']:
                        print(f"\r🎙️ 录音中... 音量: {status['last_volume']:.4f}", end="")
                    else:
                        print(f"\r⏸️ 等待语音... 阈值: {status['vad_threshold']:.4f}", end="")
            except KeyboardInterrupt:
                print("\n\n停止录音测试")
            finally:
                recorder.stop_recording()
        else:
            print("启动录音失败")
    
    test_recorder()