test_websocket_server.py 5.95 KB
#!/usr/bin/env python3
# AIfeng/2025-07-02 14:01:37
# WebSocket通信测试服务器

import asyncio
import json
import time
import weakref
from aiohttp import web, WSMsgType
import aiohttp_cors
from typing import Dict

# 全局变量
websocket_connections: Dict[int, weakref.WeakSet] = {}  # sessionid:websocket_connections

# WebSocket消息推送函数
async def broadcast_message_to_session(sessionid: int, message_type: str, content: str, source: str = "测试服务器"):
    """向指定会话的所有WebSocket连接推送消息"""
    if sessionid not in websocket_connections:
        print(f'[SessionID:{sessionid}] No WebSocket connections found')
        return
    
    message = {
        "type": "chat_message",
        "data": {
            "sessionid": sessionid,
            "message_type": message_type,
            "content": content,
            "source": source,
            "timestamp": time.time()
        }
    }
    
    # 获取该会话的所有WebSocket连接
    connections = list(websocket_connections[sessionid])
    print(f'[SessionID:{sessionid}] Broadcasting to {len(connections)} connections')
    
    # 向所有连接发送消息
    for ws in connections:
        try:
            if not ws.closed:
                await ws.send_str(json.dumps(message))
                print(f'[SessionID:{sessionid}] Message sent to WebSocket: {message_type}')
        except Exception as e:
            print(f'[SessionID:{sessionid}] Failed to send WebSocket message: {e}')

# WebSocket处理器
async def websocket_handler(request):
    """处理WebSocket连接"""
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    sessionid = None
    print('New WebSocket connection established')
    
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                try:
                    data = json.loads(msg.data)
                    print(f'Received WebSocket message: {data}')
                    
                    if data.get('type') == 'login':
                        sessionid = data.get('sessionid', 0)
                        
                        # 初始化该会话的WebSocket连接集合
                        if sessionid not in websocket_connections:
                            websocket_connections[sessionid] = weakref.WeakSet()
                        
                        # 添加当前连接到会话
                        websocket_connections[sessionid].add(ws)
                        
                        print(f'[SessionID:{sessionid}] WebSocket client logged in')
                        
                        # 发送登录确认
                        await ws.send_str(json.dumps({
                            "type": "login_success",
                            "sessionid": sessionid,
                            "message": "WebSocket连接成功"
                        }))
                    
                    elif data.get('type') == 'ping':
                        # 心跳检测
                        await ws.send_str(json.dumps({"type": "pong"}))
                        print('Sent pong response')
                        
                except json.JSONDecodeError:
                    print('Invalid JSON received from WebSocket')
                except Exception as e:
                    print(f'Error processing WebSocket message: {e}')
                    
            elif msg.type == WSMsgType.ERROR:
                print(f'WebSocket error: {ws.exception()}')
                break
                
    except Exception as e:
        print(f'WebSocket connection error: {e}')
    finally:
        if sessionid is not None:
            print(f'[SessionID:{sessionid}] WebSocket connection closed')
        else:
            print('WebSocket connection closed')
    
    return ws

# 模拟human接口
async def human(request):
    try:
        params = await request.json()
        sessionid = params.get('sessionid', 0)
        user_message = params.get('text', '')
        message_type = params.get('type', 'echo')
        
        print(f'[SessionID:{sessionid}] Received {message_type} message: {user_message}')
        
        # 推送用户消息到WebSocket
        await broadcast_message_to_session(sessionid, message_type, user_message, "用户")

        if message_type == 'echo':
            # 推送回音消息到WebSocket
            await broadcast_message_to_session(sessionid, 'echo', user_message, "回音")
            
        elif message_type == 'chat':
            # 模拟AI回复
            ai_response = f"这是对 '{user_message}' 的AI回复"
            await broadcast_message_to_session(sessionid, 'chat', ai_response, "AI助手")

        return web.Response(
            content_type="application/json",
            text=json.dumps(
                {"code": 0, "data": "ok", "message": "消息已处理并推送"}
            ),
        )
    except Exception as e:
        print(f'Error in human endpoint: {e}')
        return web.Response(
            content_type="application/json",
            text=json.dumps(
                {"code": -1, "msg": str(e)}
            ),
        )

# 创建应用
def create_app():
    app = web.Application()
    
    # 添加路由
    app.router.add_post("/human", human)
    app.router.add_get("/ws", websocket_handler)
    app.router.add_static('/', path='web')
    
    # 配置CORS
    cors = aiohttp_cors.setup(app, defaults={
        "*": aiohttp_cors.ResourceOptions(
            allow_credentials=True,
            expose_headers="*",
            allow_headers="*",
        )
    })
    
    # 为所有路由配置CORS
    for route in list(app.router.routes()):
        cors.add(route)
    
    return app

if __name__ == '__main__':
    app = create_app()
    print('Starting WebSocket test server on http://localhost:8000')
    print('WebSocket endpoint: ws://localhost:8000/ws')
    print('HTTP endpoint: http://localhost:8000/human')
    print('Test page: http://localhost:8000/websocket_test.html')
    
    web.run_app(app, host='0.0.0.0', port=8000)