test_integrated_asr.py 5.61 KB
# AIfeng/2025-07-02 14:01:37
# 测试集成到主服务的ASR功能

import asyncio
import websockets
import json
import base64
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ASRIntegrationTester:
    def __init__(self, server_url="ws://localhost:8010/ws"):
        self.server_url = server_url
        self.websocket = None
        self.sessionid = 12345
        
    async def connect(self):
        """连接到主服务WebSocket"""
        try:
            self.websocket = await websockets.connect(self.server_url)
            logger.info(f"已连接到主服务: {self.server_url}")
            
            # 发送登录消息
            login_message = {
                "type": "login",
                "sessionid": self.sessionid
            }
            await self.websocket.send(json.dumps(login_message))
            
            # 等待登录确认
            response = await self.websocket.recv()
            login_response = json.loads(response)
            logger.info(f"登录响应: {login_response}")
            
            return True
        except Exception as e:
            logger.error(f"连接失败: {e}")
            return False
    
    async def start_asr_recognition(self):
        """开始ASR识别"""
        try:
            start_message = {
                "type": "start_recognition"
            }
            await self.websocket.send(json.dumps(start_message))
            logger.info("已发送开始识别请求")
            
            # 等待响应
            response = await self.websocket.recv()
            start_response = json.loads(response)
            logger.info(f"开始识别响应: {start_response}")
            
        except Exception as e:
            logger.error(f"开始识别失败: {e}")
    
    async def send_test_audio_data(self):
        """发送测试音频数据"""
        try:
            # 模拟音频数据(实际应用中应该是真实的音频数据)
            test_audio = b"\x00\x01\x02\x03" * 1000  # 模拟音频字节
            audio_base64 = base64.b64encode(test_audio).decode('utf-8')
            
            audio_message = {
                "type": "audio_data",
                "audio_data": audio_base64
            }
            await self.websocket.send(json.dumps(audio_message))
            logger.info(f"已发送音频数据: {len(test_audio)} bytes")
            
        except Exception as e:
            logger.error(f"发送音频数据失败: {e}")
    
    async def stop_asr_recognition(self):
        """停止ASR识别"""
        try:
            stop_message = {
                "type": "stop_recognition"
            }
            await self.websocket.send(json.dumps(stop_message))
            logger.info("已发送停止识别请求")
            
            # 等待响应
            response = await self.websocket.recv()
            stop_response = json.loads(response)
            logger.info(f"停止识别响应: {stop_response}")
            
        except Exception as e:
            logger.error(f"停止识别失败: {e}")
    
    async def listen_for_results(self, duration=10):
        """监听ASR识别结果"""
        try:
            logger.info(f"开始监听ASR结果,持续{duration}秒...")
            end_time = asyncio.get_event_loop().time() + duration
            
            while asyncio.get_event_loop().time() < end_time:
                try:
                    # 设置短超时,避免阻塞
                    response = await asyncio.wait_for(self.websocket.recv(), timeout=1.0)
                    message = json.loads(response)
                    
                    if message.get('type') == 'asr_result':
                        logger.info(f"收到ASR结果: {message}")
                    elif message.get('type') == 'asr_error':
                        logger.error(f"ASR错误: {message}")
                    else:
                        logger.info(f"收到其他消息: {message}")
                        
                except asyncio.TimeoutError:
                    continue
                except Exception as e:
                    logger.error(f"接收消息时出错: {e}")
                    break
                    
        except Exception as e:
            logger.error(f"监听结果时出错: {e}")
    
    async def disconnect(self):
        """断开连接"""
        if self.websocket:
            await self.websocket.close()
            logger.info("已断开连接")
    
    async def run_test(self):
        """运行完整测试"""
        logger.info("=== ASR集成功能测试开始 ===")
        
        # 1. 连接到主服务
        if not await self.connect():
            logger.error("连接失败,测试终止")
            return
        
        try:
            # 2. 开始ASR识别
            await self.start_asr_recognition()
            await asyncio.sleep(2)
            
            # 3. 发送测试音频数据
            for i in range(3):
                await self.send_test_audio_data()
                await asyncio.sleep(1)
            
            # 4. 监听结果
            await self.listen_for_results(duration=5)
            
            # 5. 停止ASR识别
            await self.stop_asr_recognition()
            
        except Exception as e:
            logger.error(f"测试过程中出错: {e}")
        finally:
            # 6. 断开连接
            await self.disconnect()
        
        logger.info("=== ASR集成功能测试结束 ===")

async def main():
    """主函数"""
    tester = ASRIntegrationTester()
    await tester.run_test()

if __name__ == "__main__":
    asyncio.run(main())