test_integrated_asr.py
5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# AIfeng/2025-07-02 14:01:37
# 测试集成到主服务的ASR功能
import asyncio
import websockets
import json
import base64
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ASRIntegrationTester:
def __init__(self, server_url="ws://localhost:8010/ws"):
self.server_url = server_url
self.websocket = None
self.sessionid = 12345
async def connect(self):
"""连接到主服务WebSocket"""
try:
self.websocket = await websockets.connect(self.server_url)
logger.info(f"已连接到主服务: {self.server_url}")
# 发送登录消息
login_message = {
"type": "login",
"sessionid": self.sessionid
}
await self.websocket.send(json.dumps(login_message))
# 等待登录确认
response = await self.websocket.recv()
login_response = json.loads(response)
logger.info(f"登录响应: {login_response}")
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
async def start_asr_recognition(self):
"""开始ASR识别"""
try:
start_message = {
"type": "start_recognition"
}
await self.websocket.send(json.dumps(start_message))
logger.info("已发送开始识别请求")
# 等待响应
response = await self.websocket.recv()
start_response = json.loads(response)
logger.info(f"开始识别响应: {start_response}")
except Exception as e:
logger.error(f"开始识别失败: {e}")
async def send_test_audio_data(self):
"""发送测试音频数据"""
try:
# 模拟音频数据(实际应用中应该是真实的音频数据)
test_audio = b"\x00\x01\x02\x03" * 1000 # 模拟音频字节
audio_base64 = base64.b64encode(test_audio).decode('utf-8')
audio_message = {
"type": "audio_data",
"audio_data": audio_base64
}
await self.websocket.send(json.dumps(audio_message))
logger.info(f"已发送音频数据: {len(test_audio)} bytes")
except Exception as e:
logger.error(f"发送音频数据失败: {e}")
async def stop_asr_recognition(self):
"""停止ASR识别"""
try:
stop_message = {
"type": "stop_recognition"
}
await self.websocket.send(json.dumps(stop_message))
logger.info("已发送停止识别请求")
# 等待响应
response = await self.websocket.recv()
stop_response = json.loads(response)
logger.info(f"停止识别响应: {stop_response}")
except Exception as e:
logger.error(f"停止识别失败: {e}")
async def listen_for_results(self, duration=10):
"""监听ASR识别结果"""
try:
logger.info(f"开始监听ASR结果,持续{duration}秒...")
end_time = asyncio.get_event_loop().time() + duration
while asyncio.get_event_loop().time() < end_time:
try:
# 设置短超时,避免阻塞
response = await asyncio.wait_for(self.websocket.recv(), timeout=1.0)
message = json.loads(response)
if message.get('type') == 'asr_result':
logger.info(f"收到ASR结果: {message}")
elif message.get('type') == 'asr_error':
logger.error(f"ASR错误: {message}")
else:
logger.info(f"收到其他消息: {message}")
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"接收消息时出错: {e}")
break
except Exception as e:
logger.error(f"监听结果时出错: {e}")
async def disconnect(self):
"""断开连接"""
if self.websocket:
await self.websocket.close()
logger.info("已断开连接")
async def run_test(self):
"""运行完整测试"""
logger.info("=== ASR集成功能测试开始 ===")
# 1. 连接到主服务
if not await self.connect():
logger.error("连接失败,测试终止")
return
try:
# 2. 开始ASR识别
await self.start_asr_recognition()
await asyncio.sleep(2)
# 3. 发送测试音频数据
for i in range(3):
await self.send_test_audio_data()
await asyncio.sleep(1)
# 4. 监听结果
await self.listen_for_results(duration=5)
# 5. 停止ASR识别
await self.stop_asr_recognition()
except Exception as e:
logger.error(f"测试过程中出错: {e}")
finally:
# 6. 断开连接
await self.disconnect()
logger.info("=== ASR集成功能测试结束 ===")
async def main():
"""主函数"""
tester = ASRIntegrationTester()
await tester.run_test()
if __name__ == "__main__":
asyncio.run(main())