recorder_sync.py 20.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 10:27:06
服务端音频录音模块 - 同步版本
基于eman-Fay-main-copy项目的同步实现模式
"""

import audioop
import math
import time
import threading
from abc import abstractmethod
from queue import Queue
import numpy as np
import tempfile
import wave
from typing import Optional, Callable

from funasr_asr_sync import FunASRSync
from core import get_web_instance, get_instance
from scheduler.thread_manager import MyThread
from utils import util
from utils import config_util as cfg
from core import fay_core
from core import interact

try:
    import pyaudio
    PYAUDIO_AVAILABLE = True
except ImportError:
    PYAUDIO_AVAILABLE = False
    print("警告: pyaudio未安装,服务端录音功能不可用")

# 麦克风启动时间 (秒)
_ATTACK = 0.1

# 麦克风释放时间 (秒)
_RELEASE = 0.5

class RecorderSync:
    """同步录音器 - 基于参考项目实现"""
    
    def __init__(self, fay):
        self.__fay = fay
        self.__running = True
        self.__processing = False
        self.__history_level = []
        self.__history_data = []
        self.__dynamic_threshold = 0.05  # 动态阈值,进一步降低以提高语音检测灵敏度

        self.__MAX_LEVEL = 25000
        self.__MAX_BLOCK = 100
        
        # ASR模式配置
        self.ASRMode = cfg.ASR_mode
        self.__aLiNls = None
        self.is_awake = False
        self.wakeup_matched = False
        
        if cfg.config['source']['wake_word_enabled']:
            self.timer = threading.Timer(60, self.reset_wakeup_status)
            
        self.username = 'User'  # 默认用户
        self.channels = 1
        self.sample_rate = 16000  # 固定使用16000Hz,与ASR服务匹配
        self.is_reading = False
        self.stream = None

        self.__last_ws_notify_time = 0
        self.__ws_notify_interval = 0.5  # 最小通知间隔(秒)
        self.__ws_notify_thread = None
        
        # PyAudio配置
        self.chunk_size = 1024
        self.device_index = None
        self.pyaudio_instance = None
        self.audio_stream = None
        
        # 状态变化回调
        self.status_change_callback = None
        
        util.log(1, f"同步录音器初始化完成: {self.sample_rate}Hz, {self.channels}ch")

    def asrclient(self):
        """创建ASR客户端"""
        if self.ASRMode == "funasr" or self.ASRMode == "sensevoice":
            asrcli = FunASRSync(self.username)
        else:
            # 可以扩展其他ASR实现
            asrcli = FunASRSync(self.username)
        return asrcli

    def save_buffer_to_file(self, buffer):
        """保存音频缓冲区到临时文件"""
        temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav", dir="cache_data")
        wf = wave.open(temp_file.name, 'wb')
        wf.setnchannels(1)
        wf.setsampwidth(2)  
        wf.setframerate(16000)
        wf.writeframes(buffer)
        wf.close()
        return temp_file.name

    def __get_history_average(self, number):
        """获取历史音量平均值"""
        total = 0
        num = 0
        for i in range(len(self.__history_level) - 1, -1, -1):
            level = self.__history_level[i]
            total += level
            num += 1
            if num >= number:
                break
        return total / num if num > 0 else 0

    def __get_history_percentage(self, number):
        """获取历史音量百分比"""
        return (self.__get_history_average(number) / self.__MAX_LEVEL) * 1.05 + 0.02

    def reset_wakeup_status(self):
        """重置唤醒状态"""
        self.wakeup_matched = False  
        with fay_core.auto_play_lock:
            fay_core.can_auto_play = True
        util.log(1, "唤醒状态已重置")

    def __waitingResult(self, iat, audio_data):
        """等待ASR识别结果"""
        self.__processing = True
        # 通知处理状态变化
        self._notify_status_change()
        
        t = time.time()
        tm = time.time()
        
        if self.ASRMode == "funasr" or self.ASRMode == "sensevoice":
            file_url = self.save_buffer_to_file(audio_data)
            util.log(1, f"音频数据已保存到文件: {file_url}, 大小: {len(audio_data)} bytes")
            iat.send_url(file_url)
            util.log(1, f"音频文件URL已发送到ASR客户端: {file_url}")
        
        # 等待结果返回
        while not iat.done and time.time() - t < 10:  # 增加超时时间
            time.sleep(0.01)
            
        text = iat.finalResults
        util.log(1, f"语音处理完成!耗时: {math.floor((time.time() - tm) * 1000)} ms")
        
        if len(text) > 0:
            if cfg.config['source']['wake_word_enabled']:
                self.__handle_wake_word(text)
            else:
                # 非唤醒模式
                self.on_speaking(text)
                self.__processing = False
                # 通知处理状态变化
                self._notify_status_change()
        else:
            self.__processing = False
            # 通知处理状态变化
            self._notify_status_change()
            util.log(1, "[!] 语音未检测到内容!")
            self.__dynamic_threshold = self.__get_history_percentage(30)
            
            # 通知Web客户端
            if get_web_instance().is_connected(self.username):
                import asyncio
                # 创建状态消息直接推送
                status_message = {
                    "type": "status_update",
                    "panelMsg": "", 
                    'Username': self.username, 
                    'robot': f'{cfg.fay_url}/robot/Normal.jpg'
                }
                # 使用直接发送方法,避免wsa_command封装
                asyncio.create_task(get_web_instance().send_direct_message(status_message))
                
            # Human客户端通知改为日志记录(避免重复通知当前服务)
            util.log(1, f"语音未检测到内容[{self.username}]")

    def __handle_wake_word(self, text):
        """处理唤醒词逻辑"""
        util.log(1, f"检测到语音: {text}")
        
        if cfg.config['source']['wake_word_type'] == 'common':
            self.__handle_common_wake_word(text)
        elif cfg.config['source']['wake_word_type'] == 'front':
            self.__handle_front_wake_word(text)
        
        # 通知处理状态变化
        self._notify_status_change()

    def __handle_common_wake_word(self, text):
        """处理普通唤醒模式"""
        if not self.wakeup_matched:
            wake_word = cfg.config['source']['wake_word']
            wake_word_list = wake_word.split(',')
            wake_up = False
            
            for word in wake_word_list:
                if word in text:
                    wake_up = True
                    break
                    
            if wake_up:
                util.log(1, "唤醒成功!")
                self.__notify_wake_success()
                self.wakeup_matched = True
                
                with fay_core.auto_play_lock:
                    fay_core.can_auto_play = False
                    
                intt = interact.Interact("auto_play", 2, {'user': self.username, 'text': "在呢,你说?"})
                self.__fay.on_interact(intt)
                self.__processing = False
                self.timer.cancel()
            else:
                util.log(1, "[!] 待唤醒!")
                self.__notify_waiting_wake()
        else:
            self.on_speaking(text)
            self.__processing = False
            self.timer.cancel()
            self.timer = threading.Timer(60, self.reset_wakeup_status)
            self.timer.start()

    def __handle_front_wake_word(self, text):
        """处理前置唤醒词模式"""
        wake_word = cfg.config['source']['wake_word']
        wake_word_list = wake_word.split(',')
        wake_up = False
        wake_up_word = ""
        
        for word in wake_word_list:
            if text.startswith(word):
                wake_up_word = word
                wake_up = True
                break
                
        if wake_up:
            util.log(1, "唤醒成功!")
            self.__notify_wake_success()
            
            # 去除唤醒词后的语句
            question = text
            self.__fay.sound_query = Queue()
            time.sleep(0.3)
            self.on_speaking(question)
            self.__processing = False
        else:
            util.log(1, "[!] 待唤醒!")
            self.__notify_waiting_wake()

    def __notify_wake_success(self):
        """通知唤醒成功"""
        if get_web_instance().is_connected_human(self.username):
            import asyncio
            # 创建唤醒成功消息直接推送
            wake_message = {
                "type": "status_update",
                "panelMsg": "唤醒成功!", 
                "Username": self.username, 
                'robot': f'{cfg.fay_url}/robot/Listening.jpg'
            }
            # 使用直接发送方法,避免wsa_command封装
            asyncio.create_task(get_web_instance().send_direct_message(wake_message))
            
        # Human客户端通知改为日志记录(避免重复通知当前服务)
        util.log(1, f"唤醒成功[{self.username}]")

    def __notify_waiting_wake(self):
        """通知待唤醒状态"""
        if get_web_instance().is_connected(self.username):
            import asyncio
            # 创建待唤醒状态消息直接推送
            wait_message = {
                "type": "status_update",
                "panelMsg": "[!] 待唤醒!", 
                "Username": self.username, 
                'robot': f'{cfg.fay_url}/robot/Normal.jpg'
            }
            # 使用直接发送方法,避免wsa_command封装
            asyncio.create_task(get_web_instance().send_direct_message(wait_message))
            
        # Human客户端通知改为日志记录(避免重复通知当前服务)
        util.log(1, f"待唤醒状态[{self.username}]")

    def list_audio_devices(self) -> list:
        """列出可用的音频设备"""
        if not PYAUDIO_AVAILABLE:
            return []
            
        devices = []
        try:
            if not self.pyaudio_instance:
                self.pyaudio_instance = pyaudio.PyAudio()
                
            for i in range(self.pyaudio_instance.get_device_count()):
                device_info = self.pyaudio_instance.get_device_info_by_index(i)
                if device_info['maxInputChannels'] > 0:
                    devices.append({
                        'index': i,
                        'name': device_info['name'],
                        'channels': device_info['maxInputChannels'],
                        'sample_rate': int(device_info['defaultSampleRate'])
                    })
        except Exception as e:
            util.log(3, f"列出音频设备时出错: {e}")
            
        return devices

    def get_stream(self):
        """获取音频流 - 子类需要实现"""
        if not PYAUDIO_AVAILABLE:
            raise ImportError("pyaudio未安装,无法创建音频流")
            
        try:
            if not self.pyaudio_instance:
                self.pyaudio_instance = pyaudio.PyAudio()
            
            # 获取设备信息并适配采样率
            device_sample_rate = self.sample_rate
            if self.device_index is not None:
                try:
                    device_info = self.pyaudio_instance.get_device_info_by_index(self.device_index)
                    device_sample_rate = int(device_info['defaultSampleRate'])
                    util.log(1, f"设备默认采样率: {device_sample_rate}Hz")
                    
                    # 如果设备采样率与目标不同,记录但仍使用16000Hz
                    if device_sample_rate != self.sample_rate:
                        util.log(2, f"设备采样率({device_sample_rate}Hz)与目标采样率({self.sample_rate}Hz)不匹配,将使用目标采样率")
                except Exception as e:
                    util.log(2, f"获取设备信息失败: {e},使用默认采样率")
                
            self.audio_stream = self.pyaudio_instance.open(
                format=pyaudio.paInt16,
                channels=self.channels,
                rate=self.sample_rate,  # 始终使用16000Hz以匹配ASR服务
                input=True,
                input_device_index=self.device_index,
                frames_per_buffer=self.chunk_size
            )
            
            util.log(1, f"音频流创建成功: {self.sample_rate}Hz, 设备索引: {self.device_index}")
            return self.audio_stream
            
        except Exception as e:
            util.log(3, f"创建音频流时出错: {e}")
            raise e

    def start_recording(self):
        """开始录音"""
        if not PYAUDIO_AVAILABLE:
            util.log(3, "pyaudio未安装,无法开始录音")
            return False
            
        try:
            self.__running = True
            self.get_stream()  # 创建音频流
            
            # 启动录音线程
            self.recording_thread = threading.Thread(target=self.__record)
            self.recording_thread.daemon = True
            self.recording_thread.start()
            
            util.log(1, "录音已开始")
            return True
            
        except Exception as e:
            util.log(3, f"开始录音时出错: {e}")
            return False

    def stop_recording(self):
        """停止录音"""
        self.__running = False
        
        if self.audio_stream:
            try:
                self.audio_stream.stop_stream()
                self.audio_stream.close()
            except Exception as e:
                util.log(2, f"关闭音频流时出错: {e}")
            finally:
                self.audio_stream = None
                
        if self.pyaudio_instance:
            try:
                self.pyaudio_instance.terminate()
            except Exception as e:
                util.log(2, f"终止PyAudio时出错: {e}")
            finally:
                self.pyaudio_instance = None
                
        util.log(1, "录音已停止")

    def __record(self):
        """录音主循环"""
        try:
            stream = self.get_stream()
            util.log(1, "开始录音循环")
            
            # 预缓冲区,用于保存语音开始前的音频数据
            pre_buffer = []
            pre_buffer_size = 10  # 保存语音开始前10个数据块
            
            while self.__running:
                try:
                    # 读取音频数据
                    data = stream.read(self.chunk_size, exception_on_overflow=False)
                    
                    # 计算音量
                    level = audioop.rms(data, 2)
                    self.__history_level.append(level)
                    
                    if len(self.__history_level) > self.__MAX_BLOCK:
                        self.__history_level.pop(0)
                        
                    # 检测语音活动
                    threshold_value = self.__dynamic_threshold * self.__MAX_LEVEL
                    
                    # 每100次循环输出一次调试信息
                    if hasattr(self, '_debug_counter'):
                        self._debug_counter += 1
                    else:
                        self._debug_counter = 0
                        
                    if self._debug_counter % 100 == 0:
                        util.log(1, f"音量监控: 当前={level}, 阈值={threshold_value:.2f}, 动态阈值={self.__dynamic_threshold:.3f}")
                    
                    # 维护预缓冲区
                    pre_buffer.append(data)
                    if len(pre_buffer) > pre_buffer_size:
                        pre_buffer.pop(0)
                    
                    if level > threshold_value:
                        if not self.is_reading:
                            self.is_reading = True
                            # 将预缓冲区的数据加入到录音数据中,避免丢失开头
                            self.__history_data = list(pre_buffer)
                            util.log(1, f"检测到语音开始,音量: {level}, 阈值: {threshold_value:.2f}, 预缓冲: {len(pre_buffer)} 块")
                            # 通知状态变化
                            self._notify_status_change()
                            
                        self.__history_data.append(data)
                    else:
                        if self.is_reading:
                            self.is_reading = False
                            util.log(1, f"语音结束,数据块数: {len(self.__history_data)}")
                            # 通知状态变化
                            self._notify_status_change()
                            
                            if len(self.__history_data) > 3:  # 降低最小音频数据要求
                                audio_data = b''.join(self.__history_data)
                                util.log(1, f"开始处理音频数据,总长度: {len(audio_data)} bytes")
                                
                                # 创建ASR客户端并处理音频
                                iat = self.asrclient()
                                iat.start()
                                
                                # 等待连接建立
                                time.sleep(0.1)
                                
                                # 处理音频数据
                                self.__waitingResult(iat, audio_data)
                                
                                # 结束ASR
                                iat.end()
                            else:
                                util.log(1, f"音频数据不足,跳过处理: {len(self.__history_data)} 块")
                                
                            self.__history_data = []
                            
                    time.sleep(0.01)  # 短暂休眠
                    
                except Exception as e:
                    util.log(3, f"录音循环中出错: {e}")
                    time.sleep(0.1)
                    
        except Exception as e:
            util.log(3, f"录音主循环出错: {e}")
        finally:
            util.log(1, "录音循环结束")

    @abstractmethod
    def on_speaking(self, text):
        """语音识别结果回调 - 子类需要实现"""
        pass

    def is_recording(self):
        """检查是否正在录音"""
        return self.__running
    
    def get_status(self):
        """获取录音状态"""
        return {
            'is_recording': self.__running,
            'is_processing': self.__processing,
            'is_reading': self.is_reading,
            'dynamic_threshold': self.__dynamic_threshold,
            'username': self.username,
            'sample_rate': self.sample_rate,
            'device_index': self.device_index
        }
    
    def set_status_change_callback(self, callback):
        """设置状态变化回调函数"""
        self.status_change_callback = callback
    
    def _notify_status_change(self):
        """通知状态变化"""
        if self.status_change_callback:
            try:
                status = self.get_status()
                self.status_change_callback(status)
            except Exception as e:
                util.log(2, f"状态变化通知失败: {e}")

# 全局录音器实例
_global_recorder = None

def get_global_recorder():
    """获取全局录音器实例"""
    return _global_recorder

def create_global_recorder(fay):
    """创建全局录音器实例"""
    global _global_recorder
    if _global_recorder is None:
        _global_recorder = RecorderSync(fay)
    return _global_recorder

def destroy_global_recorder():
    """销毁全局录音器实例"""
    global _global_recorder
    if _global_recorder:
        _global_recorder.stop_recording()
        _global_recorder = None

if __name__ == "__main__":
    # 测试代码
    class TestFay:
        def on_interact(self, interact):
            print(f"收到交互: {interact}")
            
    class TestRecorder(RecorderSync):
        def on_speaking(self, text):
            print(f"识别结果: {text}")
    
    # 创建测试实例
    test_fay = TestFay()
    recorder = TestRecorder(test_fay)
    
    # 列出音频设备
    devices = recorder.list_audio_devices()
    print("可用音频设备:")
    for device in devices:
        print(f"  {device['index']}: {device['name']} ({device['channels']}ch, {device['sample_rate']}Hz)")
    
    # 开始录音测试
    if devices:
        print("\n开始录音测试,按Ctrl+C停止...")
        recorder.start_recording()
        
        try:
            while True:
                time.sleep(1)
                status = recorder.get_status()
                print(f"状态: {status}")
        except KeyboardInterrupt:
            print("\n停止录音测试")
            recorder.stop_recording()
    else:
        print("没有可用的音频设备")