Toggle navigation
Toggle navigation
This project
Loading...
Sign in
冯杨
/
liveTalking
Go to a project
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
冯杨
2025-07-24 15:17:04 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
302078042a5e4e739ec0dcf5384df4c7e5233e05
30207804
1 parent
c5e387ed
测试文件提交
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
152 additions
and
118 deletions
core/unified_websocket_manager.py
core/websocket_router.py
web/webrtcapichat.html
core/unified_websocket_manager.py
View file @
3020780
...
...
@@ -26,14 +26,14 @@ class WebSocketSession:
self
.
metadata
=
{}
def
__eq__
(
self
,
other
):
"""基于
websocket对象
判断会话是否相等"""
"""基于
session_id
判断会话是否相等"""
if
not
isinstance
(
other
,
WebSocketSession
):
return
False
return
self
.
websocket
is
other
.
websocket
return
self
.
session_id
==
other
.
session_id
def
__hash__
(
self
):
"""基于websocket对象的id生成哈希值"""
return
hash
(
id
(
self
.
websocket
))
"""基于session_id生成哈希值"""
return
hash
(
self
.
session_id
)
def
is_alive
(
self
)
->
bool
:
"""检查连接是否存活"""
...
...
@@ -86,7 +86,7 @@ class UnifiedWebSocketManager:
"""统一WebSocket管理器"""
def
__init__
(
self
):
self
.
_sessions
:
Dict
[
str
,
Set
[
WebSocketSession
]]
=
{}
# session_id -> WebSocketSession集合
self
.
_sessions
:
Dict
[
str
,
WebSocketSession
]
=
{}
# session_id -> WebSocketSession (单连接)
self
.
_websockets
:
Dict
[
web
.
WebSocketResponse
,
WebSocketSession
]
=
{}
# websocket -> session映射
self
.
_message_handlers
:
Dict
[
str
,
Callable
]
=
{}
# 消息类型处理器
self
.
_event_handlers
:
Dict
[
str
,
List
[
Callable
]]
=
{}
# 事件处理器
...
...
@@ -121,7 +121,7 @@ class UnifiedWebSocketManager:
logger
.
error
(
f
'事件处理器执行失败 {event_type}: {e}'
)
def
add_session
(
self
,
session_id
:
str
,
websocket
:
web
.
WebSocketResponse
)
->
WebSocketSession
:
"""添加WebSocket会话"""
"""添加WebSocket会话
- 单连接模式,新连接会替换旧连接
"""
with
self
.
_lock
:
# 检查是否已存在相同的websocket连接
if
websocket
in
self
.
_websockets
:
...
...
@@ -131,22 +131,25 @@ class UnifiedWebSocketManager:
session
=
WebSocketSession
(
session_id
,
websocket
)
# 初始化会话集合
if
session_id
not
in
self
.
_sessions
:
self
.
_sessions
[
session_id
]
=
set
()
# 检查Set添加前后的大小变化
before_count
=
len
(
self
.
_sessions
[
session_id
])
self
.
_sessions
[
session_id
]
.
add
(
session
)
after_count
=
len
(
self
.
_sessions
[
session_id
])
# 如果已存在该session_id的连接,先清理旧连接
if
session_id
in
self
.
_sessions
:
old_session
=
self
.
_sessions
[
session_id
]
# 从websockets映射中移除旧连接
if
old_session
.
websocket
in
self
.
_websockets
:
del
self
.
_websockets
[
old_session
.
websocket
]
# 尝试关闭旧连接
try
:
if
not
old_session
.
websocket
.
closed
:
asyncio
.
create_task
(
old_session
.
websocket
.
close
())
logger
.
info
(
f
"[Session:{session_id}] 已关闭旧连接"
)
except
Exception
as
e
:
logger
.
warning
(
f
"[Session:{session_id}] 关闭旧连接时出错: {e}"
)
# 添加新连接 (单连接模式)
self
.
_sessions
[
session_id
]
=
session
self
.
_websockets
[
websocket
]
=
session
logger
.
info
(
f
'[Session:{session_id}] 添加WebSocket会话 (WebSocket={id(websocket)}), 连接数变化: {before_count} -> {after_count}'
)
# 如果Set大小没有变化,说明可能存在重复
if
before_count
==
after_count
:
logger
.
warning
(
f
'[Session:{session_id}] 检测到可能的重复会话添加!Set大小未变化'
)
logger
.
info
(
f
'[Session:{session_id}] 添加WebSocket会话 (单连接模式) (WebSocket={id(websocket)})'
)
return
session
...
...
@@ -157,11 +160,9 @@ class UnifiedWebSocketManager:
session
=
self
.
_websockets
[
websocket
]
session_id
=
session
.
session_id
# 从会话集合中移除
if
session_id
in
self
.
_sessions
:
self
.
_sessions
[
session_id
]
.
discard
(
session
)
if
not
self
.
_sessions
[
session_id
]:
# 如果集合为空,删除键
del
self
.
_sessions
[
session_id
]
# 从会话映射中移除(单连接模式)
if
session_id
in
self
.
_sessions
and
self
.
_sessions
[
session_id
]
==
session
:
del
self
.
_sessions
[
session_id
]
# 从websocket映射中移除
del
self
.
_websockets
[
websocket
]
...
...
@@ -174,49 +175,57 @@ class UnifiedWebSocketManager:
"""获取WebSocket会话"""
return
self
.
_websockets
.
get
(
websocket
)
def
get_sessions_by_id
(
self
,
session_id
:
str
)
->
Set
[
WebSocketSession
]:
"""根据会话ID获取所有WebSocket会话"""
def
get_sessions_by_id
(
self
,
session_id
:
str
)
->
Optional
[
WebSocketSession
]:
"""根据会话ID获取WebSocket会话(单连接模式)"""
with
self
.
_lock
:
# 尝试使用原始session_id查找
sessions
=
self
.
_sessions
.
get
(
session_id
,
set
())
if
sessions
:
return
sessions
.
copy
()
session
=
self
.
_sessions
.
get
(
session_id
)
if
session
:
return
session
# 如果是字符串类型但存储的是整数类型,尝试转换
if
isinstance
(
session_id
,
str
)
and
session_id
.
isdigit
():
int_session_id
=
int
(
session_id
)
sessions
=
self
.
_sessions
.
get
(
int_session_id
,
set
())
if
sessions
:
return
sessions
.
copy
()
session
=
self
.
_sessions
.
get
(
int_session_id
)
if
session
:
return
session
# 如果是整数类型但存储的是字符串类型,尝试转换
elif
isinstance
(
session_id
,
int
):
str_session_id
=
str
(
session_id
)
sessions
=
self
.
_sessions
.
get
(
str_session_id
,
set
())
if
sessions
:
return
sessions
.
copy
()
session
=
self
.
_sessions
.
get
(
str_session_id
)
if
session
:
return
session
return
set
()
return
None
def
_update_session_id
(
self
,
websocket
:
web
.
WebSocketResponse
,
old_session_id
:
str
,
new_session_id
:
str
):
"""更新WebSocket会话的session_id"""
"""更新WebSocket会话的session_id
(单连接模式)
"""
with
self
.
_lock
:
if
websocket
in
self
.
_websockets
:
session
=
self
.
_websockets
[
websocket
]
# 从旧的session_id集合中移除
if
old_session_id
in
self
.
_sessions
:
self
.
_sessions
[
old_session_id
]
.
discard
(
session
)
if
not
self
.
_sessions
[
old_session_id
]:
# 如果集合为空,删除键
del
self
.
_sessions
[
old_session_id
]
# 从旧的session_id映射中移除(只有当前session匹配时)
if
old_session_id
in
self
.
_sessions
and
self
.
_sessions
[
old_session_id
]
==
session
:
del
self
.
_sessions
[
old_session_id
]
# 更新session的session_id
session
.
session_id
=
new_session_id
# 添加到新的session_id集合
if
new_session_id
not
in
self
.
_sessions
:
self
.
_sessions
[
new_session_id
]
=
set
()
self
.
_sessions
[
new_session_id
]
.
add
(
session
)
# 如果新session_id已存在,先清理旧连接
if
new_session_id
in
self
.
_sessions
:
old_session
=
self
.
_sessions
[
new_session_id
]
if
old_session
.
websocket
in
self
.
_websockets
:
del
self
.
_websockets
[
old_session
.
websocket
]
try
:
if
not
old_session
.
websocket
.
closed
:
asyncio
.
create_task
(
old_session
.
websocket
.
close
())
logger
.
info
(
f
"[Session:{new_session_id}] 更新时关闭旧连接"
)
except
Exception
as
e
:
logger
.
warning
(
f
"[Session:{new_session_id}] 更新时关闭旧连接出错: {e}"
)
# 添加到新的session_id映射(单连接模式)
self
.
_sessions
[
new_session_id
]
=
session
logger
.
info
(
f
'[Session] 更新会话ID: {old_session_id} -> {new_session_id}'
)
return
True
...
...
@@ -225,39 +234,32 @@ class UnifiedWebSocketManager:
async
def
broadcast_raw_message_to_session
(
self
,
session_id
:
str
,
message
:
Dict
,
source
:
str
=
"原数据"
)
->
int
:
"""直接广播原始消息到指定会话的所有WebSocket连接"""
# 确保session_id为字符串类型,保持一致性
# 确保session_id为字符串类型,保持一致性
if
isinstance
(
session_id
,
int
):
session_id
=
str
(
session_id
)
elif
not
isinstance
(
session_id
,
str
):
session_id
=
str
(
session_id
)
sessions
=
self
.
get_sessions_by_id
(
session_id
)
if
not
sessions
:
session
=
self
.
get_sessions_by_id
(
session_id
)
if
not
session
:
logger
.
warning
(
f
'[Session:{session_id}] 没有找到WebSocket连接'
)
return
0
# 详细调试日志:显示会话详情
logger
.
info
(
f
'[Session:{session_id}] 开始广播消息,找到 {len(sessions)} 个连接'
)
for
i
,
session
in
enumerate
(
sessions
):
logger
.
info
(
f
'[Session:{session_id}] 连接{i+1}: WebSocket={id(session.websocket)}, 创建时间={session.created_at}, 存活状态={session.is_alive()}'
)
logger
.
info
(
f
'[Session:{session_id}] 开始广播消息,找到 1 个连接'
)
logger
.
info
(
f
'[Session:{session_id}] 连接: WebSocket={id(session.websocket)}, 创建时间={session.created_at}, 存活状态={session.is_alive()}'
)
success_count
=
0
failed_sessions
=
[]
for
i
,
session
in
enumerate
(
sessions
):
logger
.
info
(
f
'[Session:{session_id}] 正在向连接{i+1}发送消息 (WebSocket={id(session.websocket)})'
)
if
await
session
.
send_message
(
message
):
success_count
+=
1
logger
.
info
(
f
'[Session:{session_id}] 连接{i+1}发送成功'
)
else
:
failed_sessions
.
append
(
session
)
logger
.
warning
(
f
'[Session:{session_id}] 连接{i+1}发送失败'
)
# 清理失败的连接
for
session
in
failed_sessions
:
logger
.
info
(
f
'[Session:{session_id}] 正在发送消息 (WebSocket={id(session.websocket)})'
)
if
await
session
.
send_message
(
message
):
success_count
=
1
logger
.
info
(
f
'[Session:{session_id}] 发送成功'
)
else
:
logger
.
warning
(
f
'[Session:{session_id}] 发送失败'
)
# 清理失败的连接
self
.
remove_session
(
session
.
websocket
)
logger
.
info
(
f
'[Session:{session_id}] 广播原始消息完成: 成功{success_count}/总计
{len(sessions)}, 失败{len(failed_sessions)}
'
)
logger
.
info
(
f
'[Session:{session_id}] 广播原始消息完成: 成功{success_count}/总计
1
'
)
return
success_count
async
def
broadcast_to_session
(
self
,
session_id
:
str
,
message_type
:
str
,
content
:
Any
,
...
...
@@ -269,11 +271,16 @@ class UnifiedWebSocketManager:
elif
not
isinstance
(
session_id
,
str
):
session_id
=
str
(
session_id
)
sessions
=
self
.
get_sessions_by_id
(
session_id
)
if
not
sessions
:
session
=
self
.
get_sessions_by_id
(
session_id
)
if
not
session
:
logger
.
warning
(
f
'[Session:{session_id}] 没有找到WebSocket连接'
)
return
0
# 详细调试日志:显示会话详情
logger
.
info
(
f
'[Session:{session_id}] 开始广播消息,找到 1 个连接'
)
logger
.
info
(
f
'[Session:{session_id}] 连接: WebSocket={id(session.websocket)}, 创建时间={session.created_at}, 存活状态={session.is_alive()}'
)
message
=
{
"type"
:
message_type
,
"session_id"
:
session_id
,
...
...
@@ -284,19 +291,17 @@ class UnifiedWebSocketManager:
}
success_count
=
0
failed_sessions
=
[]
for
session
in
sessions
:
if
await
session
.
send_message
(
message
):
success_count
+=
1
else
:
failed_sessions
.
append
(
session
)
# 清理失败的连接
for
session
in
failed_sessions
:
logger
.
info
(
f
'[Session:{session_id}] 正在发送消息 (WebSocket={id(session.websocket)})'
)
if
await
session
.
send_message
(
message
):
success_count
=
1
logger
.
info
(
f
'[Session:{session_id}] 发送成功'
)
else
:
logger
.
warning
(
f
'[Session:{session_id}] 发送失败'
)
# 清理失败的连接
self
.
remove_session
(
session
.
websocket
)
logger
.
info
(
f
'[Session:{session_id}] 广播
消息成功: {success_count}/{len(sessions)}
'
)
logger
.
info
(
f
'[Session:{session_id}] 广播
原始消息完成: 成功{success_count}/总计1
'
)
return
success_count
async
def
broadcast_to_all
(
self
,
message_type
:
str
,
content
:
Any
,
...
...
@@ -324,7 +329,7 @@ class UnifiedWebSocketManager:
return
len
(
self
.
_websockets
)
def
get_session_stats
(
self
)
->
Dict
[
str
,
Any
]:
"""获取会话统计信息"""
"""获取会话统计信息
(单连接模式)
"""
with
self
.
_lock
:
stats
=
{
"total_sessions"
:
len
(
self
.
_sessions
),
...
...
@@ -332,17 +337,15 @@ class UnifiedWebSocketManager:
"session_details"
:
{}
}
for
session_id
,
session
s
in
self
.
_sessions
.
items
():
for
session_id
,
session
in
self
.
_sessions
.
items
():
stats
[
"session_details"
][
session_id
]
=
{
"connection_count"
:
len
(
sessions
),
"connections"
:
[
{
"created_at"
:
session
.
created_at
,
"last_ping"
:
session
.
last_ping
,
"is_alive"
:
session
.
is_alive
(),
"metadata"
:
session
.
metadata
}
for
session
in
sessions
]
"connection_count"
:
1
,
# 单连接模式固定为1
"connection"
:
{
"created_at"
:
session
.
created_at
,
"last_ping"
:
session
.
last_ping
,
"is_alive"
:
session
.
is_alive
(),
"metadata"
:
session
.
metadata
}
}
return
stats
...
...
core/websocket_router.py
View file @
3020780
...
...
@@ -69,6 +69,8 @@ class WebSocketRouter:
digital_human_service
=
get_digital_human_service
()
self
.
service_registry
.
register_service
(
digital_human_service
)
# 注册WSA服务
from
.wsa_websocket_service
import
WSAWebSocketService
,
initialize_wsa_service
wsa_service
=
WSAWebSocketService
(
self
.
manager
)
...
...
@@ -157,7 +159,7 @@ class WebSocketRouter:
digital_human_service
=
self
.
service_registry
.
get_service
(
"digital_human_service"
)
if
digital_human_service
:
stats
[
"digital_human_stats"
]
=
digital_human_service
.
get_digital_human_stats
()
return
stats
def
setup_routes
(
self
,
app
:
web
.
Application
,
path
:
str
=
'/ws'
):
...
...
web/webrtcapichat.html
View file @
3020780
...
...
@@ -1541,7 +1541,21 @@
// 获取选择的消息类型,默认为chat
var
messageType
=
document
.
getElementById
(
'message-type'
)
?
document
.
getElementById
(
'message-type'
).
value
:
'chat'
;
// 发送消息到服务器,不再直接添加到界面,等待WebSocket推送
// // 立即将用户消息显示在对话框右侧
// var senderLabel = '用户';
// var messageMode = 'text';
// if (messageType === 'chat') {
// senderLabel = '用户';
// messageMode = 'chat';
// } else if (messageType === 'echo') {
// senderLabel = '用户';
// messageMode = 'echo';
// }
// // 添加用户消息到界面
// addMessage(message, 'right', senderLabel, messageMode, '', 'web');
// 发送消息到服务器
var
requestData
=
{
text
:
message
,
type
:
messageType
,
...
...
@@ -1591,6 +1605,7 @@
$
(
'#clear-storage'
).
click
(
function
()
{
if
(
confirm
(
'确定要清理所有本地聊天记录吗?此操作不可恢复!'
))
{
ChatStorage
.
clearStorage
();
localStorage
.
removeItem
(
'chatHistory'
);
const
chatMessages
=
document
.
getElementById
(
"chatMessages"
);
if
(
chatMessages
)
{
chatMessages
.
innerHTML
=
''
;
...
...
@@ -2030,17 +2045,6 @@
saveChatHistory
();
}
// 清空聊天记录函数
function
clearChatHistory
()
{
const
chatMessages
=
document
.
getElementById
(
"chatMessages"
);
if
(
chatMessages
)
{
chatMessages
.
innerHTML
=
""
;
}
// 清理新的ChatStorage数据
ChatStorage
.
clearStorage
();
// 兼容清理旧的localStorage项
localStorage
.
removeItem
(
'chatHistory'
);
}
// 切换对话框显示/隐藏
function
toggleChatOverlay
()
{
...
...
@@ -2380,6 +2384,7 @@
if
(
messageData
.
type
===
'login_success'
)
{
console
.
log
(
'WebSocket登录成功:'
,
messageData
.
message
);
console
.
log
(
'登录成功的sessionid:'
,
messageData
.
sessionid
);
updateConnectionStatus
(
'connected'
,
`聊天服务器已连接
(
会话
ID
:
$
{
messageData
.
sessionid
||
document
.
getElementById
(
'sessionid'
).
value
})
`
);
return
;
}
...
...
@@ -2387,6 +2392,11 @@
// 处理心跳响应
if
(
messageData
.
type
===
'pong'
)
{
console
.
log
(
'收到心跳响应'
);
// 心跳正常时确保连接状态显示为已连接
var
currentSessionId
=
document
.
getElementById
(
'sessionid'
).
value
;
if
(
currentSessionId
&&
parseInt
(
currentSessionId
)
!==
0
)
{
updateConnectionStatus
(
'connected'
,
`聊天服务器已连接
(
会话
ID
:
$
{
currentSessionId
})
`
);
}
return
;
}
...
...
@@ -2394,14 +2404,32 @@
if
(
messageData
.
type
===
'chat_message'
)
{
console
.
log
(
'收到聊天消息:'
,
messageData
);
var
messageContent
=
messageData
.
content
||
messageData
.
message
||
messageData
.
text
||
''
;
var
messageType
=
messageData
.
message_type
||
'text'
;
var
sender
=
messageData
.
sender
||
'unknown'
;
// 正确解析嵌套的content对象
var
contentObj
=
messageData
.
content
||
{};
var
messageContent
=
''
;
var
messageType
=
'text'
;
var
sender
=
'unknown'
;
var
sessionId
=
messageData
.
session_id
;
var
modelInfo
=
messageData
.
model_info
||
''
;
var
requestSource
=
messageData
.
request_source
||
''
;
var
modelInfo
=
''
;
var
requestSource
=
''
;
var
timestamp
=
messageData
.
timestamp
||
new
Date
().
toISOString
();
// 如果content是对象,从中提取字段
if
(
typeof
contentObj
===
'object'
&&
contentObj
!==
null
)
{
messageContent
=
contentObj
.
content
||
contentObj
.
message
||
contentObj
.
text
||
''
;
messageType
=
contentObj
.
message_type
||
'text'
;
sender
=
contentObj
.
source
||
messageData
.
sender
||
'unknown'
;
modelInfo
=
contentObj
.
model_info
||
''
;
requestSource
=
contentObj
.
request_source
||
''
;
}
else
{
// 如果content是字符串,直接使用
messageContent
=
contentObj
||
messageData
.
message
||
messageData
.
text
||
''
;
messageType
=
messageData
.
message_type
||
'text'
;
sender
=
messageData
.
sender
||
'unknown'
;
modelInfo
=
messageData
.
model_info
||
''
;
requestSource
=
messageData
.
request_source
||
''
;
}
// 判断消息方向和样式
var
alignment
=
'left'
;
var
senderLabel
=
'数字人回复'
;
...
...
@@ -2516,17 +2544,18 @@
// 在连接关闭前发送WSA注销消息到主服务
if
(
ws
.
readyState
===
WebSocket
.
OPEN
)
{
try
{
var
wsaUnregisterMessage
=
{
type
:
'wsa_unregister_web'
,
username
:
getTemporaryUsername
(),
timestamp
:
Date
.
now
()
};
console
.
log
(
'发送WSA注销消息到主服务:'
,
wsaUnregisterMessage
);
ws
.
send
(
JSON
.
stringify
(
wsaUnregisterMessage
));
}
catch
(
error
)
{
console
.
error
(
'发送WSA注销消息失败:'
,
error
);
}
// try {
// var wsaUnregisterMessage = {
// type: 'wsa_unregister_web',
// username: getTemporaryUsername(),
// timestamp: Date.now()
// };
// console.log('发送WSA注销消息到主服务:', wsaUnregisterMessage);
// ws.send(JSON.stringify(wsaUnregisterMessage));
// } catch (error) {
// console.error('发送WSA注销消息失败:', error);
// }
console
.
debug
(
'close触发:'
,
Date
.
now
());
}
updateConnectionStatus
(
'disconnected'
,
'WebSocket连接已断开,正在重连...'
);
...
...
Please
register
or
login
to post a comment