funasr_asr_sync.py 18.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 10:27:06
FunASR语音识别模块 - 同步版本
基于eman-Fay-main-copy项目的同步实现模式
"""

from threading import Thread
import websocket
import json
import time
import ssl
import _thread as thread
import os
import asyncio
import threading

from core import get_web_instance, get_instance
from utils import config_util as cfg
from utils import util

class FunASRSync:
    """FunASR同步客户端 - 基于参考项目实现"""
    
    def __init__(self, username):
        self.__URL = "ws://{}:{}".format(cfg.local_asr_ip, cfg.local_asr_port)
        self.__ws = None
        self.__connected = False
        self.__frames = []
        self.__state = 0
        self.__closing = False
        self.__task_id = ''
        self.done = False
        self.finalResults = ""
        self.__reconnect_delay = 1
        self.__reconnecting = False
        self.username = username
        self.started = True
        self.__result_callback = None  # 添加结果回调
        
        util.log(1, f"FunASR同步客户端初始化完成,用户: {username}")
    
    def on_message(self, ws, message):
        """收到websocket消息的处理"""
        try:
            util.log(1, f"收到FunASR消息: {message}")
            
            # 尝试解析JSON消息以区分状态消息和识别结果
            try:
                import json
                parsed_message = json.loads(message)
                
                # 检查是否为状态消息(如分块准备消息)
                if isinstance(parsed_message, dict) and 'status' in parsed_message:
                    status = parsed_message.get('status')
                    if status == 'ready':
                        util.log(1, f"收到分块准备状态: {parsed_message.get('message', '')}")
                        return  # 状态消息不触发回调
                    elif status in ['processing', 'chunk_received']:
                        util.log(1, f"收到处理状态: {status}")
                        return  # 处理状态消息不触发回调
                    elif status == 'error':
                        util.log(3, f"收到错误状态: {parsed_message.get('message', '')}")
                        return
                
                # 如果是字典但不是状态消息,可能是结构化的识别结果
                if isinstance(parsed_message, dict) and 'text' in parsed_message:
                    # 结构化识别结果
                    recognition_text = parsed_message.get('text', '')
                    if recognition_text.strip():  # 只有非空结果才处理
                        self.done = True
                        self.finalResults = recognition_text
                        util.log(1, f"收到结构化识别结果: {recognition_text}")
                        self._trigger_result_callback()
                    return
                    
            except json.JSONDecodeError:
                # 不是JSON格式,可能是纯文本识别结果
                pass
            
            # 处理纯文本识别结果
            if isinstance(message, str) and message.strip():
                # 过滤掉明显的状态消息
                if any(keyword in message.lower() for keyword in ['status', 'ready', '准备接收', 'processing', 'chunk']):
                    util.log(1, f"跳过状态消息: {message}")
                    return
                
                # 这是真正的识别结果
                self.done = True
                self.finalResults = message
                util.log(1, f"收到文本识别结果: {message}")
                self._trigger_result_callback()
                
        except Exception as e:
            util.log(3, f"处理识别结果时出错: {e}")

        if self.__closing:
            try:
                self.__ws.close()
            except Exception as e:
                util.log(2, f"关闭WebSocket时出错: {e}")
    
    def _trigger_result_callback(self):
        """触发结果回调函数"""
        if self.__result_callback:
            try:
                # 创建chat_message直接推送
                chat_message = {
                    "type":"chat_message",
                    "sender":"回音",
                    "text": self.finalResults,
                    "Username": self.username,
                    "model_info":"Funasr"
                }
                
                self.__result_callback(chat_message)
                util.log(1, f"已触发结果回调: {self.finalResults}")
            except Exception as e:
                util.log(3, f"调用结果回调时出错: {e}")
            
            # 发送到Web客户端(改进的异步调用方式)
            # try:
            #     # 先检查WSA服务是否已初始化
            #     web_instance = get_web_instance()
            #     if web_instance and web_instance.is_connected(self.username):
            #         # 创建chat_message直接推送
            #         chat_message = {
            #             "type":"chat_message",
            #             "sender":"回音",
            #             "content": self.finalResults,
            #             "Username": self.username,
            #             "model_info":"Funasr"
            #         }
            #         # 方案1: 使用add_cmd推送wsa_command类型数据
            #         # web_instance.add_cmd(chat_message)
                    
            #         util.log(1, f"FunASR识别结果已推送到Web客户端[{self.username}]: {self.finalResults}")
            #     else:
            #         util.log(2, f"用户{self.username}未连接到Web客户端,跳过推送")
            # except RuntimeError as e:
            #     # WSA服务未初始化,这是正常情况(服务启动顺序问题)
            #     util.log(2, f"WSA服务未初始化,跳过Web客户端通知: {e}")
            # except Exception as e:
            #     util.log(3, f"发送到Web客户端时出错: {e}")
            
            # Human客户端通知改为日志记录(避免重复通知当前服务)
            # util.log(1, f"FunASR识别结果[{self.username}]: {self.finalResults}")

        if self.__closing:
            try:
                self.__ws.close()
            except Exception as e:
                util.log(2, f"关闭WebSocket时出错: {e}")

    def on_close(self, ws, code, msg):
        """收到websocket关闭的处理"""
        self.__connected = False
        util.log(2, f"FunASR连接关闭: {msg}")
        self.__ws = None

    def on_error(self, ws, error):
        """收到websocket错误的处理"""
        self.__connected = False
        util.log(3, f"FunASR连接错误: {error}")
        self.__ws = None

    def __attempt_reconnect(self):
        """重连机制"""
        if not self.__reconnecting:
            self.__reconnecting = True
            util.log(1, "尝试重连FunASR...")
            while not self.__connected:
                time.sleep(self.__reconnect_delay)
                self.start()
                self.__reconnect_delay *= 2  
            self.__reconnect_delay = 1  
            self.__reconnecting = False

    def on_open(self, ws):
        """收到websocket连接建立的处理"""
        self.__connected = True
        util.log(1, "FunASR WebSocket连接建立")

        def run(*args):
            while self.__connected:
                try:
                    if len(self.__frames) > 0:
                        frame = self.__frames[0]
                        self.__frames.pop(0)
                        
                        if type(frame) == dict:
                            ws.send(json.dumps(frame))
                        elif type(frame) == bytes:
                            ws.send(frame, websocket.ABNF.OPCODE_BINARY)
                            
                except Exception as e:
                    util.log(3, f"发送帧数据时出错: {e}")
                # 优化发送间隔,从0.04秒减少到0.02秒提高效率
                time.sleep(0.02)

        thread.start_new_thread(run, ())

    def __connect(self):
        """建立WebSocket连接"""
        self.finalResults = ""
        self.done = False
        self.__frames.clear()
        websocket.enableTrace(False)
        
        self.__ws = websocket.WebSocketApp(
            self.__URL, 
            on_message=self.on_message,
            on_close=self.on_close,
            on_error=self.on_error
        )
        self.__ws.on_open = self.on_open
        self.__ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

    def add_frame(self, frame):
        """添加帧到发送队列"""
        self.__frames.append(frame)

    def send(self, buf):
        """发送音频数据"""
        self.__frames.append(buf)

    def send_url(self, url):
        """发送音频文件URL"""
        # 确保使用绝对路径,相对路径对funasr服务无效
        absolute_url = os.path.abspath(url)
        frame = {'url': absolute_url}
        if self.__ws and self.__connected:
            util.log(1, f"发送音频文件URL到FunASR: {absolute_url}")
            self.__ws.send(json.dumps(frame))
            util.log(1, f"音频文件URL已发送: {frame}")
        else:
            util.log(2, f"WebSocket未连接,无法发送URL: {absolute_url}")
    
    def send_audio_data(self, audio_bytes, filename="audio.wav"):
        """发送音频数据(支持大文件分块)"""
        import base64
        import math
        
        try:
            # 确保audio_bytes是bytes类型,避免memoryview缓冲区问题
            if hasattr(audio_bytes, 'tobytes'):
                audio_bytes = bytes(audio_bytes.tobytes())  # Fix BufferError: memoryview has 1 exported buffer
            elif isinstance(audio_bytes, memoryview):
                audio_bytes = bytes(audio_bytes)
            
            total_size = len(audio_bytes)
            
            # 大文件阈值:1MB,超过则使用分块发送
            large_file_threshold = 512 * 1024  # aiohttp限制默认1M,但再处理base64,会增加33%
            
            if total_size > large_file_threshold:
                util.log(1, f"检测到大文件({total_size} bytes),使用分块发送模式")
                return self._send_audio_data_chunked(audio_bytes, filename)
            else:
                # 小文件使用原有方式
                return self._send_audio_data_simple(audio_bytes, filename)
                
        except Exception as e:
            util.log(3, f"发送音频数据时出错: {e}")
            return False
    
    def _send_audio_data_simple(self, audio_bytes, filename):
        """简单发送模式(小文件)"""
        import base64
        
        try:
            # 将音频字节数据编码为Base64
            audio_data_b64 = base64.b64encode(audio_bytes).decode('utf-8')
            
            # 构造发送格式,与funasr服务的process_audio_data函数兼容
            frame = {
                'audio_data': audio_data_b64,
                'filename': filename
            }
            
            if self.__ws and self.__connected:
                util.log(1, f"发送音频数据到FunASR: {filename}, 大小: {len(audio_bytes)} bytes")
                success = self._send_frame_with_retry(frame)
                if success:
                    util.log(1, f"音频数据已发送: {filename}")
                    return True
                else:
                    util.log(3, f"音频数据发送失败: {filename}")
                    return False
            else:
                util.log(2, f"WebSocket未连接,无法发送音频数据: {filename}")
                return False
                
        except Exception as e:
            util.log(3, f"简单发送音频数据时出错: {e}")
            return False
    
    def _send_audio_data_chunked(self, audio_bytes, filename, chunk_size=512*1024):
        """分块发送音频数据(大文件)"""
        import base64
        import math
        
        try:
            total_size = len(audio_bytes)
            total_chunks = math.ceil(total_size / chunk_size)
            
            util.log(1, f"开始分块发送: {filename}, 总大小: {total_size} bytes, 分块数: {total_chunks}")
            
            # 发送开始信号
            start_frame = {
                'type': 'audio_start',
                'filename': filename,
                'total_size': total_size,
                'total_chunks': total_chunks,
                'chunk_size': chunk_size
            }
            
            if not self._send_frame_with_retry(start_frame):
                util.log(3, f"发送开始信号失败: {filename}")
                return False
            
            # 分块发送
            for i in range(total_chunks):
                start_pos = i * chunk_size
                end_pos = min(start_pos + chunk_size, total_size)
                chunk_data = audio_bytes[start_pos:end_pos]
                
                # Base64编码分块
                chunk_b64 = base64.b64encode(chunk_data).decode('utf-8')
                
                chunk_frame = {
                    'type': 'audio_chunk',
                    'filename': filename,
                    'chunk_index': i,
                    'chunk_data': chunk_b64,
                    'is_last': (i == total_chunks - 1)
                }
                
                # 发送分块并检查结果
                success = self._send_frame_with_retry(chunk_frame)
                if not success:
                    util.log(3, f"分块 {i+1}/{total_chunks} 发送失败")
                    return False
                
                # 进度日志
                if (i + 1) % 10 == 0 or i == total_chunks - 1:
                    progress = ((i + 1) / total_chunks) * 100
                    util.log(1, f"发送进度: {progress:.1f}% ({i+1}/{total_chunks})")
                
                # 流控延迟
                time.sleep(0.01)
            
            # 发送结束信号
            end_frame = {
                'type': 'audio_end',
                'filename': filename
            }
            
            if self._send_frame_with_retry(end_frame):
                util.log(1, f"音频数据分块发送完成: {filename}")
                return True
            else:
                util.log(3, f"发送结束信号失败: {filename}")
                return False
            
        except Exception as e:
            util.log(3, f"分块发送音频数据时出错: {e}")
            return False
    
    def _send_frame_with_retry(self, frame, max_retries=3, timeout=10):
        """带重试的帧发送"""
        for attempt in range(max_retries):
            try:
                if self.__ws and self.__connected:
                    # 设置发送超时
                    start_time = time.time()
                    self.__ws.send(json.dumps(frame))
                    
                    # 简单的发送确认检查
                    time.sleep(0.05)  # 等待发送完成
                    
                    if time.time() - start_time < timeout:
                        return True
                    else:
                        util.log(2, f"发送超时,尝试 {attempt + 1}/{max_retries}")
                else:
                    util.log(2, f"连接不可用,尝试 {attempt + 1}/{max_retries}")
                    
            except Exception as e:
                util.log(2, f"发送失败,尝试 {attempt + 1}/{max_retries}: {e}")
                
            if attempt < max_retries - 1:
                time.sleep(0.5 * (attempt + 1))  # 指数退避
        
        return False
    
    def set_result_callback(self, callback):
        """设置结果回调函数"""
        self.__result_callback = callback
        util.log(1, f"已设置结果回调函数")
    

    def connect(self):
        """连接到FunASR服务(同步版本)"""
        try:
            if not self.__connected:
                self.start()  # 调用现有的start方法
                
                # 等待连接建立,最多等待30秒(针对大文件处理优化)
                max_wait_time = 30.0
                wait_interval = 0.1
                waited_time = 0.0
                
                while not self.__connected and waited_time < max_wait_time:
                    time.sleep(wait_interval)
                    waited_time += wait_interval
                    
                    # 每5秒输出一次等待日志
                    if waited_time % 5.0 < wait_interval:
                        util.log(1, f"等待FunASR连接中... {waited_time:.1f}s/{max_wait_time}s")
                
                if self.__connected:
                    util.log(1, f"FunASR连接成功,耗时: {waited_time:.2f}秒")
                else:
                    util.log(3, f"FunASR连接超时,等待了{waited_time:.2f}秒")
                
                return self.__connected
            return True
        except Exception as e:
            util.log(3, f"连接FunASR服务时出错: {e}")
            return False
 
    def start(self):
        """启动FunASR客户端"""
        Thread(target=self.__connect, args=[]).start()
        data = {
            'vad_need': False,
            'state': 'StartTranscription'
        }
        self.add_frame(data)
        util.log(1, "FunASR客户端启动")

    def is_connected(self):
        """检查连接状态"""
        return self.__connected
    
    def end(self):
        """结束FunASR客户端"""
        if self.__connected:
            try:
                # 发送剩余帧
                for frame in self.__frames:
                    self.__frames.pop(0)
                    if type(frame) == dict:
                        self.__ws.send(json.dumps(frame))
                    elif type(frame) == bytes:
                        self.__ws.send(frame, websocket.ABNF.OPCODE_BINARY)
                        
                self.__frames.clear()
                
                # 发送停止信号
                frame = {'vad_need': False, 'state': 'StopTranscription'}
                self.__ws.send(json.dumps(frame))
                
            except Exception as e:
                util.log(3, f"结束FunASR时出错: {e}")
                
        self.__closing = True
        self.__connected = False
        util.log(1, "FunASR客户端结束")