funasr.py
4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
FunASR WebSocket客户端 - 兼容性包装器
基于新的FunASRClient实现
"""
import sys
import os
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from funasr_asr import FunASRClient
# 修复导入路径
try:
from core import get_web_instance, get_instance
except ImportError:
# 如果core模块不存在,创建一个模拟的函数
def get_web_instance():
return MockWebInstance()
def get_instance():
return MockInstance()
class MockWebInstance:
def is_connected(self, username):
return False
def add_cmd(self, cmd):
print(f"Mock Web: {cmd}")
class MockInstance:
def is_connected_human(self, username):
return False
def add_cmd(self, cmd):
print(f"Mock Human: {cmd}")
try:
from utils import config_util as cfg
except ImportError:
# 使用项目根目录的config_util
import config_util as cfg
try:
from utils import util
except ImportError:
# 使用项目根目录的util
import util
class FunASR:
"""FunASR兼容性包装器"""
def __init__(self, username):
# 创建一个简单的选项对象
class SimpleOpt:
def __init__(self, username):
self.username = username
opt = SimpleOpt(username)
self.client = FunASRClient(opt)
self.username = username
self.__connected = False
self.__frames = []
self.__state = 0
self.__closing = False
self.__task_id = ''
self.done = False
self.finalResults = ""
self.__reconnect_delay = 1
self.__reconnecting = False
self.started = True
# 消息处理回调
self.on_message_callback = None
# 设置结果回调
self.client.set_result_callback(self._handle_result)
def set_message_callback(self, callback):
"""设置消息回调函数"""
self.on_message_callback = callback
def _handle_result(self, message):
"""处理识别结果的内部方法"""
try:
self.done = True
self.finalResults = message
# 调用用户设置的回调函数
if self.on_message_callback:
self.on_message_callback(message)
if get_web_instance().is_connected(self.username):
import asyncio
# 创建chat_message直接推送
chat_message = {
"type": "chat_message",
"sender": "回音",
"content": self.finalResults, # 修复字段名:panelMsg -> content
"Username": self.username,
"model_info": "FunASR"
}
# 使用直接发送方法,避免wsa_command封装
asyncio.create_task(get_web_instance().send_direct_message(chat_message))
# Human客户端通知改为日志记录(避免重复通知当前服务)
util.log(1, f"FunASR识别结果[{self.username}]: {self.finalResults}")
except Exception as e:
print(e)
# 兼容性方法
def on_message(self, ws, message):
"""兼容性方法 - 收到websocket消息的处理"""
self._handle_result(message)
def on_close(self, ws, code, msg):
"""兼容性方法 - 收到websocket错误的处理"""
self.__connected = False
# util.printInfo(1, self.username, f"### CLOSE:{msg}")
def on_error(self, ws, error):
"""兼容性方法 - 收到websocket错误的处理"""
self.__connected = False
# util.printInfo(1, self.username, f"### error:{error}")
def on_open(self, ws):
"""兼容性方法 - 收到websocket连接建立的处理"""
self.__connected = True
def add_frame(self, frame):
"""兼容性方法 - 添加音频帧"""
if isinstance(frame, bytes):
self.client.send_audio(frame)
else:
# 对于字典类型的控制消息,暂时忽略
pass
def send(self, buf):
"""兼容性方法 - 发送音频数据"""
if isinstance(buf, bytes):
self.client.send_audio(buf)
def send_url(self, url):
"""兼容性方法 - 发送URL(新客户端不支持此功能)"""
print(f"警告: send_url功能在新的FunASR客户端中不支持: {url}")
def start(self):
"""兼容性方法 - 启动识别"""
self.client.start_recognition()
self.__connected = True
self.done = False
self.finalResults = ""
def end(self):
"""兼容性方法 - 结束识别"""
self.client.stop_recognition()
self.__closing = True
self.__connected = False