test_unified_websocket_architecture.py
10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27 16:52:46
统一WebSocket架构测试
验证新架构的核心功能是否正常工作
"""
import asyncio
import json
import logging
from typing import Dict, Any
from aiohttp import web, WSMsgType, ClientSession
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class UnifiedWebSocketTester:
"""统一WebSocket架构测试器"""
def __init__(self, server_url: str = "ws://localhost:8010/ws"):
self.server_url = server_url
self.session: ClientSession = None
self.websocket = None
self.received_messages = []
async def connect(self):
"""连接到WebSocket服务器"""
try:
self.session = ClientSession()
self.websocket = await self.session.ws_connect(self.server_url)
logger.info(f"已连接到 {self.server_url}")
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
async def disconnect(self):
"""断开连接"""
if self.websocket:
await self.websocket.close()
if self.session:
await self.session.close()
logger.info("已断开连接")
async def send_message(self, message: Dict[str, Any]):
"""发送消息"""
if not self.websocket:
logger.error("WebSocket未连接")
return False
try:
await self.websocket.send_str(json.dumps(message))
logger.info(f"发送消息: {message['type']}")
return True
except Exception as e:
logger.error(f"发送消息失败: {e}")
return False
async def receive_messages(self, timeout: float = 5.0):
"""接收消息"""
if not self.websocket:
return []
messages = []
try:
async for msg in self.websocket:
if msg.type == WSMsgType.TEXT:
data = json.loads(msg.data)
messages.append(data)
logger.info(f"收到消息: {data.get('type', 'unknown')}")
elif msg.type == WSMsgType.ERROR:
logger.error(f"WebSocket错误: {self.websocket.exception()}")
break
elif msg.type == WSMsgType.CLOSE:
logger.info("WebSocket连接已关闭")
break
# 简单超时控制
if len(messages) >= 10: # 限制接收消息数量
break
except Exception as e:
logger.error(f"接收消息失败: {e}")
return messages
async def test_login(self, session_id: str = "test_session_001"):
"""测试登录功能"""
logger.info("=== 测试登录功能 ===")
login_message = {
"type": "login",
"sessionid": session_id,
"data": {
"user_id": "test_user",
"client_type": "test_client"
}
}
success = await self.send_message(login_message)
if success:
# 等待响应
await asyncio.sleep(1)
logger.info("✅ 登录消息发送成功")
else:
logger.error("❌ 登录消息发送失败")
return success
async def test_heartbeat(self):
"""测试心跳功能"""
logger.info("=== 测试心跳功能 ===")
heartbeat_message = {
"type": "heartbeat",
"timestamp": asyncio.get_event_loop().time()
}
success = await self.send_message(heartbeat_message)
if success:
await asyncio.sleep(1)
logger.info("✅ 心跳消息发送成功")
else:
logger.error("❌ 心跳消息发送失败")
return success
async def test_asr_functionality(self):
"""测试ASR功能"""
logger.info("=== 测试ASR功能 ===")
# 测试开始ASR识别
start_asr_message = {
"type": "start_asr_recognition",
"data": {
"language": "zh-CN",
"sample_rate": 16000
}
}
success = await self.send_message(start_asr_message)
if success:
await asyncio.sleep(1)
logger.info("✅ ASR开始识别消息发送成功")
else:
logger.error("❌ ASR开始识别消息发送失败")
return False
# 测试发送音频数据
audio_data_message = {
"type": "asr_audio_data",
"data": {
"audio_data": "fake_audio_data_base64",
"format": "wav"
}
}
success = await self.send_message(audio_data_message)
if success:
await asyncio.sleep(1)
logger.info("✅ ASR音频数据发送成功")
else:
logger.error("❌ ASR音频数据发送失败")
# 测试停止ASR识别
stop_asr_message = {
"type": "stop_asr_recognition"
}
success = await self.send_message(stop_asr_message)
if success:
await asyncio.sleep(1)
logger.info("✅ ASR停止识别消息发送成功")
else:
logger.error("❌ ASR停止识别消息发送失败")
return True
async def test_digital_human_functionality(self):
"""测试数字人功能"""
logger.info("=== 测试数字人功能 ===")
# 测试注册数字人
register_message = {
"type": "register_digital_human",
"data": {
"human_id": "test_human_001",
"name": "测试数字人",
"capabilities": ["speak", "gesture", "emotion"]
}
}
success = await self.send_message(register_message)
if success:
await asyncio.sleep(1)
logger.info("✅ 数字人注册消息发送成功")
else:
logger.error("❌ 数字人注册消息发送失败")
return False
# 测试数字人说话
speak_message = {
"type": "digital_human_speak",
"data": {
"human_id": "test_human_001",
"text": "你好,我是测试数字人",
"voice_id": "default"
}
}
success = await self.send_message(speak_message)
if success:
await asyncio.sleep(1)
logger.info("✅ 数字人说话消息发送成功")
else:
logger.error("❌ 数字人说话消息发送失败")
# 测试获取数字人列表
list_message = {
"type": "get_digital_humans"
}
success = await self.send_message(list_message)
if success:
await asyncio.sleep(1)
logger.info("✅ 获取数字人列表消息发送成功")
else:
logger.error("❌ 获取数字人列表消息发送失败")
return True
async def test_wsa_functionality(self):
"""测试WSA功能"""
logger.info("=== 测试WSA功能 ===")
# 测试注册Web连接
register_web_message = {
"type": "wsa_register_web",
"data": {
"username": "test_web_user"
}
}
success = await self.send_message(register_web_message)
if success:
await asyncio.sleep(1)
logger.info("✅ WSA Web注册消息发送成功")
else:
logger.error("❌ WSA Web注册消息发送失败")
return False
# 测试获取WSA状态
status_message = {
"type": "wsa_get_status"
}
success = await self.send_message(status_message)
if success:
await asyncio.sleep(1)
logger.info("✅ WSA状态查询消息发送成功")
else:
logger.error("❌ WSA状态查询消息发送失败")
return True
async def run_comprehensive_test(self):
"""运行综合测试"""
logger.info("🚀 开始统一WebSocket架构综合测试")
# 连接到服务器
if not await self.connect():
logger.error("❌ 无法连接到服务器,测试终止")
return False
try:
# 启动消息接收任务
receive_task = asyncio.create_task(self.receive_messages())
# 执行各项功能测试
test_results = []
# 基础功能测试
test_results.append(await self.test_login())
test_results.append(await self.test_heartbeat())
# 服务功能测试
test_results.append(await self.test_asr_functionality())
test_results.append(await self.test_digital_human_functionality())
test_results.append(await self.test_wsa_functionality())
# 等待接收响应
await asyncio.sleep(2)
# 取消接收任务
receive_task.cancel()
try:
await receive_task
except asyncio.CancelledError:
pass
# 统计测试结果
passed_tests = sum(test_results)
total_tests = len(test_results)
logger.info(f"\n📊 测试结果统计:")
logger.info(f" 总测试数: {total_tests}")
logger.info(f" 通过测试: {passed_tests}")
logger.info(f" 失败测试: {total_tests - passed_tests}")
logger.info(f" 成功率: {passed_tests/total_tests*100:.1f}%")
if passed_tests == total_tests:
logger.info("🎉 所有测试通过!统一WebSocket架构工作正常")
return True
else:
logger.warning("⚠️ 部分测试失败,请检查服务器状态")
return False
except Exception as e:
logger.error(f"测试过程中发生错误: {e}")
return False
finally:
await self.disconnect()
async def main():
"""主函数"""
tester = UnifiedWebSocketTester()
success = await tester.run_comprehensive_test()
if success:
logger.info("\n✅ 统一WebSocket架构测试完成 - 所有功能正常")
else:
logger.error("\n❌ 统一WebSocket架构测试完成 - 发现问题")
return success
if __name__ == "__main__":
# 运行测试
result = asyncio.run(main())
exit(0 if result else 1)