冯杨

豆包大模型,名称赋予

音频识别asr:使用本地方案funasr(复用的Fay项目中的funasr)
FunASR服务连接测试脚本
用于验证本地FunASR WebSocket服务是否可以正常连接
webrtcapichat.html中对话框做进一步调整,侧边栏增加对话框的透明度调整。暂时设置对话框的背景色差异大些,美学设计暂不考虑。对话框支持隐藏
from threading import Thread
from threading import Lock
import websocket
import json
import time
import ssl
import wave
import _thread as thread
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from core import wsa_server
from scheduler.thread_manager import MyThread
from utils import util
from utils import config_util as cfg
from core.authorize_tb import Authorize_Tb
__running = True
__my_thread = None
_token = ''
def __post_token():
global _token
__client = AcsClient(
cfg.key_ali_nls_key_id,
cfg.key_ali_nls_key_secret,
"cn-shanghai"
)
__request = CommonRequest()
__request.set_method('POST')
__request.set_domain('nls-meta.cn-shanghai.aliyuncs.com')
__request.set_version('2019-02-28')
__request.set_action_name('CreateToken')
info = json.loads(__client.do_action_with_exception(__request))
_token = info['Token']['Id']
authorize = Authorize_Tb()
authorize_info = authorize.find_by_userid(cfg.key_ali_nls_key_id)
if authorize_info is not None:
authorize.update_by_userid(cfg.key_ali_nls_key_id, _token, info['Token']['ExpireTime']*1000)
else:
authorize.add(cfg.key_ali_nls_key_id, _token, info['Token']['ExpireTime']*1000)
def __runnable():
while __running:
__post_token()
time.sleep(60 * 60 * 12)
def start():
MyThread(target=__runnable).start()
class ALiNls:
# 初始化
def __init__(self, username):
self.__URL = 'wss://nls-gateway-cn-shenzhen.aliyuncs.com/ws/v1'
self.__ws = None
self.__frames = []
self.started = False
self.__closing = False
self.__task_id = ''
self.done = False
self.finalResults = ""
self.username = username
self.data = b''
self.__endding = False
self.__is_close = False
self.lock = Lock()
def __create_header(self, name):
if name == 'StartTranscription':
self.__task_id = util.random_hex(32)
header = {
"appkey": cfg.key_ali_nls_app_key,
"message_id": util.random_hex(32),
"task_id": self.__task_id,
"namespace": "SpeechTranscriber",
"name": name
}
return header
# 收到websocket消息的处理
def on_message(self, ws, message):
try:
data = json.loads(message)
header = data['header']
name = header['name']
if name == 'TranscriptionStarted':
self.started = True
if name == 'SentenceEnd':
self.done = True
self.finalResults = data['payload']['result']
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": self.finalResults, "Username" : self.username})
if wsa_server.get_instance().is_connected_human(self.username):
content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': self.finalResults}, 'Username' : self.username}
wsa_server.get_instance().add_cmd(content)
ws.close()#TODO
elif name == 'TranscriptionResultChanged':
self.finalResults = data['payload']['result']
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": self.finalResults, "Username" : self.username})
if wsa_server.get_instance().is_connected_human(self.username):
content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': self.finalResults}, 'Username' : self.username}
wsa_server.get_instance().add_cmd(content)
except Exception as e:
print(e)
# print("### message:", message)
# 收到websocket的关闭要求
def on_close(self, ws, code, msg):
self.__endding = True
self.__is_close = True
# 收到websocket错误的处理
def on_error(self, ws, error):
print("aliyun asr error:", error)
self.started = True #避免在aliyun asr出错时,recorder一直等待start状态返回
# 收到websocket连接建立的处理
def on_open(self, ws):
self.__endding = False
#为了兼容多路asr,关闭过程数据
def run(*args):
while self.__endding == False:
try:
if len(self.__frames) > 0:
with self.lock:
frame = self.__frames.pop(0)
if isinstance(frame, dict):
ws.send(json.dumps(frame))
elif isinstance(frame, bytes):
ws.send(frame, websocket.ABNF.OPCODE_BINARY)
self.data += frame
else:
time.sleep(0.001) # 避免忙等
except Exception as e:
print(e)
break
if self.__is_close == False:
for frame in self.__frames:
ws.send(frame, websocket.ABNF.OPCODE_BINARY)
frame = {"header": self.__create_header('StopTranscription')}
ws.send(json.dumps(frame))
thread.start_new_thread(run, ())
def __connect(self):
self.finalResults = ""
self.done = False
with self.lock:
self.__frames.clear()
self.__ws = websocket.WebSocketApp(self.__URL + '?token=' + _token, on_message=self.on_message)
self.__ws.on_open = self.on_open
self.__ws.on_error = self.on_error
self.__ws.on_close = self.on_close
self.__ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
def send(self, buf):
with self.lock:
self.__frames.append(buf)
def start(self):
Thread(target=self.__connect, args=[]).start()
data = {
'header': self.__create_header('StartTranscription'),
"payload": {
"format": "pcm",
"sample_rate": 16000,
"enable_intermediate_result": True,
"enable_punctuation_prediction": False,
"enable_inverse_text_normalization": True,
"speech_noise_threshold": -1
}
}
self.send(data)
def end(self):
self.__endding = True
with wave.open('cache_data/input2.wav', 'wb') as wf:
# 设置音频参数
n_channels = 1 # 单声道
sampwidth = 2 # 16 位音频,每个采样点 2 字节
wf.setnchannels(n_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(16000)
wf.writeframes(self.data)
self.data = b''
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
FunASR WebSocket客户端 - 兼容性包装器
基于新的FunASRClient实现
"""
import sys
import os
# 添加项目根目录到路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from funasr_asr import FunASRClient
# 修复导入路径
try:
from core import wsa_server
except ImportError:
# 如果core模块不存在,创建一个模拟的wsa_server
class MockWSAServer:
def get_web_instance(self):
return MockWebInstance()
def get_instance(self):
return MockInstance()
class MockWebInstance:
def is_connected(self, username):
return False
def add_cmd(self, cmd):
print(f"Mock Web: {cmd}")
class MockInstance:
def is_connected_human(self, username):
return False
def add_cmd(self, cmd):
print(f"Mock Human: {cmd}")
wsa_server = MockWSAServer()
try:
from utils import config_util as cfg
except ImportError:
# 使用项目根目录的config_util
import config_util as cfg
try:
from utils import util
except ImportError:
# 使用项目根目录的util
import util
class FunASR:
"""FunASR兼容性包装器"""
def __init__(self, username):
# 创建一个简单的选项对象
class SimpleOpt:
def __init__(self, username):
self.username = username
opt = SimpleOpt(username)
self.client = FunASRClient(opt)
self.username = username
self.__connected = False
self.__frames = []
self.__state = 0
self.__closing = False
self.__task_id = ''
self.done = False
self.finalResults = ""
self.__reconnect_delay = 1
self.__reconnecting = False
self.started = True
# 消息处理回调
self.on_message_callback = None
# 设置结果回调
self.client.set_result_callback(self._handle_result)
def set_message_callback(self, callback):
"""设置消息回调函数"""
self.on_message_callback = callback
def _handle_result(self, message):
"""处理识别结果的内部方法"""
try:
self.done = True
self.finalResults = message
# 调用用户设置的回调函数
if self.on_message_callback:
self.on_message_callback(message)
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": self.finalResults, "Username" : self.username})
if wsa_server.get_instance().is_connected_human(self.username):
content = {'Topic': 'human', 'Data': {'Key': 'log', 'Value': self.finalResults}, 'Username' : self.username}
wsa_server.get_instance().add_cmd(content)
except Exception as e:
print(e)
# 兼容性方法
def on_message(self, ws, message):
"""兼容性方法 - 收到websocket消息的处理"""
self._handle_result(message)
def on_close(self, ws, code, msg):
"""兼容性方法 - 收到websocket错误的处理"""
self.__connected = False
# util.printInfo(1, self.username, f"### CLOSE:{msg}")
def on_error(self, ws, error):
"""兼容性方法 - 收到websocket错误的处理"""
self.__connected = False
# util.printInfo(1, self.username, f"### error:{error}")
def on_open(self, ws):
"""兼容性方法 - 收到websocket连接建立的处理"""
self.__connected = True
def add_frame(self, frame):
"""兼容性方法 - 添加音频帧"""
if isinstance(frame, bytes):
self.client.send_audio(frame)
else:
# 对于字典类型的控制消息,暂时忽略
pass
def send(self, buf):
"""兼容性方法 - 发送音频数据"""
if isinstance(buf, bytes):
self.client.send_audio(buf)
def send_url(self, url):
"""兼容性方法 - 发送URL(新客户端不支持此功能)"""
print(f"警告: send_url功能在新的FunASR客户端中不支持: {url}")
def start(self):
"""兼容性方法 - 启动识别"""
self.client.start_recognition()
self.__connected = True
self.done = False
self.finalResults = ""
def end(self):
"""兼容性方法 - 结束识别"""
self.client.stop_recognition()
self.__closing = True
self.__connected = False
... ...
import pyaudio
import websockets
import asyncio
from queue import Queue
import argparse
import json
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="127.0.0.1", required=False, help="host ip, localhost, 0.0.0.0")
parser.add_argument("--port", type=int, default=10197, required=False, help="grpc server port")
parser.add_argument("--chunk_size", type=int, default=160, help="ms")
parser.add_argument("--vad_needed", type=bool, default=True)
args = parser.parse_args()
voices = Queue()
async def record():
global voices
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = int(RATE / 1000 * args.chunk_size)
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
while True:
data = stream.read(CHUNK)
voices.put(data)
await asyncio.sleep(0.01)
async def ws_send(websocket):
global voices
print("Started sending data!")
data_head = {
'vad_need': args.vad_needed,
'state': ''
}
await websocket.send(json.dumps(data_head))
while True:
while not voices.empty():
data = voices.get()
voices.task_done()
try:
await websocket.send(data)
except Exception as e:
print('Exception occurred:', e)
return # Return to attempt reconnection
await asyncio.sleep(0.01)
async def message(websocket):
while True:
try:
print(await websocket.recv())
except Exception as e:
print("Exception:", e)
return # Return to attempt reconnection
async def ws_client():
uri = "ws://{}:{}".format(args.host, args.port)
while True:
try:
async with websockets.connect(uri, subprotocols=["binary"], ping_interval=None) as websocket:
task1 = asyncio.create_task(record())
task2 = asyncio.create_task(ws_send(websocket))
task3 = asyncio.create_task(message(websocket))
await asyncio.gather(task1, task2, task3)
except Exception as e:
print("WebSocket connection failed: ", e)
await asyncio.sleep(5) # Wait for 5 seconds before trying to reconnect
asyncio.get_event_loop().run_until_complete(ws_client())
\ No newline at end of file
... ...
import asyncio
import websockets
import argparse
import json
import logging
from funasr import AutoModel
import os
# 设置日志级别
logger = logging.getLogger(__name__)
logger.setLevel(logging.CRITICAL)
# 解析命令行参数
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="0.0.0.0", help="host ip, localhost, 0.0.0.0")
parser.add_argument("--port", type=int, default=10197, help="grpc server port")
parser.add_argument("--ngpu", type=int, default=1, help="0 for cpu, 1 for gpu")
parser.add_argument("--gpu_id", type=int, default=0, help="specify which gpu device to use")
args = parser.parse_args()
# 初始化模型
print("model loading")
asr_model = AutoModel(model="paraformer-zh", model_revision="v2.0.4",
vad_model="fsmn-vad", vad_model_revision="v2.0.4",
punc_model="ct-punc-c", punc_model_revision="v2.0.4",
device=f"cuda:{args.gpu_id}" if args.ngpu else "cpu", disable_update=True)
# ,disable_update=True
print("model loaded")
websocket_users = {}
task_queue = asyncio.Queue()
async def ws_serve(websocket, path):
global websocket_users
user_id = id(websocket)
websocket_users[user_id] = websocket
try:
async for message in websocket:
if isinstance(message, str):
data = json.loads(message)
if 'url' in data:
await task_queue.put((websocket, data['url']))
except websockets.exceptions.ConnectionClosed as e:
logger.info(f"Connection closed: {e.reason}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
logger.info(f"Cleaning up connection for user {user_id}")
if user_id in websocket_users:
del websocket_users[user_id]
await websocket.close()
logger.info("WebSocket closed")
async def worker():
while True:
websocket, url = await task_queue.get()
if websocket.open:
await process_wav_file(websocket, url)
else:
logger.info("WebSocket connection is already closed when trying to process file")
task_queue.task_done()
async def process_wav_file(websocket, url):
# 热词
param_dict = {"sentence_timestamp": False}
with open("data/hotword.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
lines = [line.strip() for line in lines]
hotword = " ".join(lines)
print(f"热词:{hotword}")
param_dict["hotword"] = hotword
wav_path = url
try:
res = asr_model.generate(input=wav_path, is_final=True, **param_dict)
if res:
if 'text' in res[0] and websocket.open:
await websocket.send(res[0]['text'])
except Exception as e:
print(f"Error during model.generate: {e}")
finally:
if os.path.exists(wav_path):
os.remove(wav_path)
async def main():
server = await websockets.serve(ws_serve, args.host, args.port, ping_interval=10)
worker_task = asyncio.create_task(worker())
try:
# 保持服务器运行,直到被手动中断
print(f"ASR服务器已启动,监听地址: {args.host}:{args.port}")
await asyncio.Future() # 永久等待,直到程序被中断
except asyncio.CancelledError:
print("服务器正在关闭...")
finally:
# 清理资源
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
server.close()
await server.wait_closed()
# 使用 asyncio 运行主函数
asyncio.run(main())
... ...
## 语音服务介绍
该服务以modelscope funasr语音识别为基础
## Install
pip install torch -i https://mirrors.aliyun.com/pypi/simple/
pip install modelscope -i https://mirrors.aliyun.com/pypi/simple/
pip install testresources -i https://mirrors.aliyun.com/pypi/simple/
pip install websockets -i https://mirrors.aliyun.com/pypi/simple/
pip install torchaudio -i https://mirrors.aliyun.com/pypi/simple/
pip install FunASR -i https://mirrors.aliyun.com/pypi/simple/
## Start server
2、python -u ASR_server.py --host "0.0.0.0" --port 10197 --ngpu 0
## Fay connect
更改fay/system.conf配置项,并重新启动fay.
https://www.bilibili.com/video/BV1qs4y1g74e/?share_source=copy_web&vd_source=64cd9062f5046acba398177b62bea9ad
## Acknowledge
感谢
1. 中科大脑算法工程师张聪聪
2. [cgisky1980](https://github.com/cgisky1980/FunASR)
3. [modelscope](https://github.com/modelscope/modelscope)
4. [FunASR](https://github.com/alibaba-damo-academy/FunASR)
5. [Fay数字人助理](https://github.com/TheRamU/Fay).
--------------------------------------------------------------------------------------
GPU服务器部署:
GPU服务器局域网地址:10.110.3.219
可用GPU:1号GPU
python -u ASR_server.py --host "10.110.3.219" --port 10197 --ngpu 1 --gpu_id 1
python -u ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 1
lsof -i :10197
#!/bin/bash
source /home/fengyang/anaconda3/bin/activate livetalking
nohup python /home/fengyang/controlPanel/main.py >> /home/fengyang/controlPanel/panel_logfile.log 2>&1 &
chmod +x serverFunasr.sh
\ No newline at end of file
... ...
'''
Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights
Reserved. MIT License (https://opensource.org/licenses/MIT)
2022-2023 by zhaomingwork@qq.com
'''
# pip install websocket-client
import ssl
from websocket import ABNF
from websocket import create_connection
from queue import Queue
import threading
import traceback
import json
import time
import numpy as np
import pyaudio
import asyncio
import argparse
# class for recognizer in websocket
class Funasr_websocket_recognizer():
'''
python asr recognizer lib
'''
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="127.0.0.1", required=False, help="host ip, localhost, 0.0.0.0")
parser.add_argument("--port", type=int, default=10194, required=False, help="grpc server port")
parser.add_argument("--chunk_size", type=int, default=160, help="ms")
parser.add_argument("--vad_needed", type=bool, default=True)
args = parser.parse_args()
def __init__(self, host="127.0.0.1",
port="10197",
is_ssl=True,
chunk_size="0, 10, 5",
chunk_interval=10,
mode="2pass",
wav_name="default"):
'''
host: server host ip
port: server port
is_ssl: True for wss protocal, False for ws
'''
try:
if is_ssl == True:
ssl_context = ssl.SSLContext()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
uri = "wss://{}:{}".format(host, port)
ssl_opt={"cert_reqs": ssl.CERT_NONE}
else:
uri = "ws://{}:{}".format(host, port)
ssl_context = None
ssl_opt=None
self.host = host
self.port = port
self.msg_queue = Queue() # used for recognized result text
print("connect to url",uri)
self.websocket=create_connection(uri, ssl=ssl_context, sslopt=ssl_opt)
self.thread_msg = threading.Thread(target=Funasr_websocket_recognizer.thread_rec_msg, args=(self,))
self.thread_msg.start()
chunk_size = [int(x) for x in chunk_size.split(",")]
stride = int(60 * chunk_size[1] / chunk_interval / 1000 * 16000 * 2)
chunk_num = (len(audio_bytes) - 1) // stride + 1
message = json.dumps({"mode": mode,
"chunk_size": chunk_size,
"encoder_chunk_look_back": 4,
"decoder_chunk_look_back": 1,
"chunk_interval": chunk_interval,
"wav_name": wav_name,
"is_speaking": True})
self.websocket.send(message)
print("send json",message)
except Exception as e:
print("Exception:", e)
traceback.print_exc()
# async def record():
# global voices
# FORMAT = pyaudio.paInt16
# CHANNELS = 1
# RATE = 16000
# CHUNK = int(RATE / 1000 * args.chunk_size)
# p = pyaudio.PyAudio()
# stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
# while True:
# data = stream.read(CHUNK)
# voices.put(data)
# await asyncio.sleep(0.01)
# threads for rev msg
def thread_rec_msg(self):
try:
while(True):
msg=self.websocket.recv()
if msg is None or len(msg) == 0:
continue
msg = json.loads(msg)
self.msg_queue.put(msg)
except Exception as e:
print("client closed")
# feed data to asr engine, wait_time means waiting for result until time out
def feed_chunk(self, chunk, wait_time=0.01):
try:
self.websocket.send(chunk, ABNF.OPCODE_BINARY)
# loop to check if there is a message, timeout in 0.01s
while(True):
msg = self.msg_queue.get(timeout=wait_time)
if self.msg_queue.empty():
break
return msg
except:
return ""
def close(self,timeout=1):
message = json.dumps({"is_speaking": False})
self.websocket.send(message)
# sleep for timeout seconds to wait for result
time.sleep(timeout)
msg=""
while(not self.msg_queue.empty()):
msg = self.msg_queue.get()
self.websocket.close()
# only resturn the last msg
return msg
if __name__ == '__main__':
print('example for Funasr_websocket_recognizer')
import wave
wav_path = "long.wav"
# wav_path = "/Users/zhifu/Downloads/modelscope_models/speech_seaco_paraformer_large_asr_nat-zh-cn-16k-common-vocab8404-pytorch/example/asr_example.wav"
with wave.open(wav_path, "rb") as wav_file:
params = wav_file.getparams()
frames = wav_file.readframes(wav_file.getnframes())
audio_bytes = bytes(frames)
stride = int(60 * 10 / 10 / 1000 * 16000 * 2)
chunk_num = (len(audio_bytes) - 1) // stride + 1
# create an recognizer
rcg = Funasr_websocket_recognizer()
# loop to send chunk
for i in range(chunk_num):
beg = i * stride
data = audio_bytes[beg:beg + stride]
text = rcg.feed_chunk(data,wait_time=0.02)
if len(text)>0:
print("text",text)
time.sleep(0.05)
# get last message
text = rcg.close(timeout=3)
print("text",text)
... ...