冯杨

豆包大模型,名称赋予

音频识别asr:使用本地方案funasr(复用的Fay项目中的funasr)
FunASR服务连接测试脚本
用于验证本地FunASR WebSocket服务是否可以正常连接
webrtcapichat.html中对话框做进一步调整,侧边栏增加对话框的透明度调整。暂时设置对话框的背景色差异大些,美学设计暂不考虑。对话框支持隐藏
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
配置管理工具模块
统一管理项目配置参数
"""
import json
import os
from typing import Dict, Any
class ConfigManager:
"""配置管理器"""
def __init__(self):
self._config = {}
self._load_default_config()
self._load_config_files()
def _load_default_config(self):
"""加载默认配置"""
self._config.update({
# FunASR配置
'local_asr_ip': '127.0.0.1',
'local_asr_port': 10197,
# 阿里云NLS配置
'key_ali_nls_key_id': '',
'key_ali_nls_key_secret': '',
'key_ali_nls_app_key': '',
# 其他ASR配置
'asr_timeout': 30,
'asr_reconnect_delay': 1,
'asr_max_reconnect_attempts': 5,
})
def _load_config_files(self):
"""加载配置文件"""
config_files = [
'config/asr_config.json',
'config/llm_config.json',
'config/doubao_config.json'
]
for config_file in config_files:
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
file_config = json.load(f)
self._config.update(file_config)
except Exception as e:
print(f"警告: 加载配置文件 {config_file} 失败: {e}")
def get(self, key: str, default=None):
"""获取配置值"""
return self._config.get(key, default)
def set(self, key: str, value: Any):
"""设置配置值"""
self._config[key] = value
def update(self, config_dict: Dict[str, Any]):
"""批量更新配置"""
self._config.update(config_dict)
def save_to_file(self, file_path: str):
"""保存配置到文件"""
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(self._config, f, ensure_ascii=False, indent=2)
def __getattr__(self, name):
"""支持属性访问方式"""
if name in self._config:
return self._config[name]
raise AttributeError(f"配置项 '{name}' 不存在")
# 全局配置实例
_config_manager = ConfigManager()
# 兼容原有的属性访问方式
local_asr_ip = _config_manager.local_asr_ip
local_asr_port = _config_manager.local_asr_port
key_ali_nls_key_id = _config_manager.key_ali_nls_key_id
key_ali_nls_key_secret = _config_manager.key_ali_nls_key_secret
key_ali_nls_app_key = _config_manager.key_ali_nls_app_key
# 导出配置管理器
config = _config_manager
\ No newline at end of file
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
Core模块初始化文件
"""
from .wsa_server import get_web_instance, get_instance
__all__ = ['get_web_instance', 'get_instance']
\ No newline at end of file
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
WebSocket服务器管理模块
提供Web和Human连接的管理功能
"""
import queue
from typing import Dict, Any, Optional
from threading import Lock
class WebSocketManager:
"""WebSocket连接管理器"""
def __init__(self):
self._connections = {}
self._command_queue = queue.Queue()
self._lock = Lock()
def is_connected(self, username: str) -> bool:
"""检查用户是否已连接
Args:
username: 用户名
Returns:
是否已连接
"""
with self._lock:
return username in self._connections
def is_connected_human(self, username: str) -> bool:
"""检查人类用户是否已连接
Args:
username: 用户名
Returns:
是否已连接
"""
# 简化实现,与is_connected相同
return self.is_connected(username)
def add_connection(self, username: str, connection: Any):
"""添加连接
Args:
username: 用户名
connection: 连接对象
"""
with self._lock:
self._connections[username] = connection
def remove_connection(self, username: str):
"""移除连接
Args:
username: 用户名
"""
with self._lock:
self._connections.pop(username, None)
def add_cmd(self, command: Dict[str, Any]):
"""添加命令到队列
Args:
command: 命令字典
"""
try:
self._command_queue.put(command, timeout=1.0)
except queue.Full:
print(f"警告: 命令队列已满,丢弃命令: {command}")
def get_cmd(self, timeout: float = 1.0) -> Optional[Dict[str, Any]]:
"""从队列获取命令
Args:
timeout: 超时时间
Returns:
命令字典或None
"""
try:
return self._command_queue.get(timeout=timeout)
except queue.Empty:
return None
def get_connection_count(self) -> int:
"""获取连接数量"""
with self._lock:
return len(self._connections)
def get_usernames(self) -> list:
"""获取所有用户名列表"""
with self._lock:
return list(self._connections.keys())
# 全局实例
_web_instance = WebSocketManager()
_human_instance = WebSocketManager()
def get_web_instance() -> WebSocketManager:
"""获取Web WebSocket管理器实例"""
return _web_instance
def get_instance() -> WebSocketManager:
"""获取Human WebSocket管理器实例"""
return _human_instance
\ No newline at end of file
... ...
# ASR/TTS技术架构分析与扩展方案
**AIfeng/2024-12-19**
## 1. 项目ASR技术实现分析
### 1.1 ASR架构设计
项目采用模块化ASR架构,基于`BaseASR`抽象类实现多种ASR方案:
#### 核心架构组件
- **BaseASR**: 抽象基类,定义统一接口
- **音频处理流水线**: 16kHz采样率,20ms帧长度(320样本/帧)
- **队列机制**: 使用Queue进行音频帧缓冲
- **多进程支持**: 基于torch.multiprocessing实现并发处理
#### 当前ASR实现类型
1. **NerfASR** (`nerfasr.py`)
- 支持多种音频特征: Esperanto(44维)、DeepSpeech(29维)、Hubert(1024维)
- 上下文缓存机制: stride_left + context + stride_right
- GPU/CPU自适应推理
2. **MuseASR** (`museasr.py`)
- 基于Whisper音频特征提取
- 集成Audio2Feature处理器
- 批处理优化(batch_size*2)
3. **HubertASR** (`hubertasr.py`)
- Hubert音频特征处理
- 可配置音频特征长度[8,8]
- 实时音频流处理
4. **LipASR** (`lipasr.py`)
- 基于Wav2Lip的梅尔频谱特征
- 80维梅尔频谱处理
- 唇形同步优化
### 1.2 前端ASR实现
**Web ASR模块** (`web/asr/`)
- **技术栈**: WebSocket + Web Audio API
- **音频格式**: PCM 16kHz 16bit
- **实时传输**: 基于FunASR WebSocket协议
- **浏览器兼容**: 支持现代浏览器录音API
## 2. TTS技术实现分析
### 2.1 TTS架构设计
基于`BaseTTS`抽象类的统一TTS框架:
#### 核心特性
- **异步处理**: 基于线程的TTS渲染
- **流式输出**: 支持实时音频流生成
- **状态管理**: RUNNING/PAUSE状态控制
- **音频重采样**: 统一输出16kHz采样率
### 2.2 TTS服务实现
1. **EdgeTTS**
- 微软Edge浏览器TTS服务
- 异步流式处理
- 多语言支持(zh-CN-XiaoxiaoNeural等)
2. **FishTTS**
- 本地/远程TTS服务
- RESTful API接口
- 流式音频生成
- 参考音频克隆
3. **SovitsTTS**
- GPT-SoVITS语音克隆
- OGG格式流式输出
- 情感语音合成
4. **CosyVoiceTTS**
- 阿里CosyVoice服务
- 高质量语音合成
- 参考音频支持
5. **TencentTTS**
- 腾讯云语音合成
- 企业级API服务
- 多音色支持
6. **XTTS**
- Coqui XTTS服务
- 多语言语音克隆
- 本地部署支持
## 3. 技术架构优势
### 3.1 设计模式优势
- **策略模式**: 通过继承BaseASR/BaseTTS实现算法切换
- **观察者模式**: WebSocket消息推送机制
- **生产者消费者**: 音频队列处理
### 3.2 性能优化
- **批处理**: 音频帧批量处理减少延迟
- **内存管理**: 循环缓冲区避免内存泄漏
- **并发处理**: 多进程/多线程提升吞吐量
## 4. 第三方ASR扩展方案
### 4.1 云端ASR服务集成
#### 4.1.1 百度ASR
```python
class BaiduASR(BaseASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
self.app_id = os.getenv("BAIDU_APP_ID")
self.api_key = os.getenv("BAIDU_API_KEY")
self.secret_key = os.getenv("BAIDU_SECRET_KEY")
self.client = AipSpeech(self.app_id, self.api_key, self.secret_key)
def run_step(self):
# 实现百度ASR实时识别逻辑
pass
```
#### 4.1.2 阿里云ASR
```python
class AliyunASR(BaseASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
self.access_key = os.getenv("ALIYUN_ACCESS_KEY")
self.access_secret = os.getenv("ALIYUN_ACCESS_SECRET")
# 初始化阿里云ASR客户端
def run_step(self):
# 实现阿里云实时ASR
pass
```
#### 4.1.3 腾讯云ASR
```python
class TencentASR(BaseASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
self.secret_id = os.getenv("TENCENT_SECRET_ID")
self.secret_key = os.getenv("TENCENT_SECRET_KEY")
# 初始化腾讯云ASR
```
### 4.2 开源ASR模型集成
#### 4.2.1 Whisper集成
```python
class WhisperASR(BaseASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
import whisper
self.model = whisper.load_model(opt.whisper_model)
def run_step(self):
# 实现Whisper实时识别
audio_data = self.get_audio_buffer()
result = self.model.transcribe(audio_data)
return result['text']
```
#### 4.2.2 SenseVoice集成
```python
class SenseVoiceASR(BaseASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
from funasr import AutoModel
self.model = AutoModel(model="sensevoice")
```
### 4.3 配置化ASR选择
```python
# config/asr_config.json
{
"asr_providers": {
"baidu": {
"class": "BaiduASR",
"config": {
"app_id": "${BAIDU_APP_ID}",
"api_key": "${BAIDU_API_KEY}"
}
},
"whisper": {
"class": "WhisperASR",
"config": {
"model_size": "base",
"device": "cuda"
}
}
},
"default_provider": "whisper"
}
```
## 5. 第三方TTS扩展方案
### 5.1 云端TTS服务
#### 5.1.1 百度TTS
```python
class BaiduTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
from aip import AipSpeech
self.client = AipSpeech(app_id, api_key, secret_key)
def txt_to_audio(self, msg):
text, textevent = msg
result = self.client.synthesis(text, 'zh', 1, {
'vol': 5, 'per': 4, 'spd': 5, 'pit': 5
})
self.stream_audio(result, msg)
```
#### 5.1.2 Azure TTS
```python
class AzureTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
import azure.cognitiveservices.speech as speechsdk
self.speech_config = speechsdk.SpeechConfig(
subscription=opt.azure_key,
region=opt.azure_region
)
```
### 5.2 开源TTS模型
#### 5.2.1 Coqui TTS
```python
class CoquiTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
from TTS.api import TTS
self.tts = TTS(model_name=opt.coqui_model)
def txt_to_audio(self, msg):
text, textevent = msg
wav = self.tts.tts(text=text, speaker_wav=opt.REF_FILE)
self.stream_audio_array(wav, msg)
```
#### 5.2.2 PaddleSpeech TTS
```python
class PaddleTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
from paddlespeech.cli.tts import TTSExecutor
self.tts_executor = TTSExecutor()
```
## 6. 本地离线服务优化方案
### 6.1 Docker容器化部署
#### 6.1.1 ASR服务容器
```dockerfile
# Dockerfile.asr
FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime
RUN pip install whisper funasr sensevoice
COPY asr_server.py /app/
COPY models/ /app/models/
EXPOSE 8001
CMD ["python", "/app/asr_server.py"]
```
#### 6.1.2 TTS服务容器
```dockerfile
# Dockerfile.tts
FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime
RUN pip install TTS coqui-ai-tts
COPY tts_server.py /app/
COPY models/ /app/models/
EXPOSE 8002
CMD ["python", "/app/tts_server.py"]
```
### 6.2 微服务架构
```yaml
# docker-compose.yml
version: '3.8'
services:
asr-service:
build:
context: .
dockerfile: Dockerfile.asr
ports:
- "8001:8001"
volumes:
- ./models:/app/models
environment:
- CUDA_VISIBLE_DEVICES=0
tts-service:
build:
context: .
dockerfile: Dockerfile.tts
ports:
- "8002:8002"
volumes:
- ./models:/app/models
environment:
- CUDA_VISIBLE_DEVICES=1
main-app:
build: .
ports:
- "7860:7860"
depends_on:
- asr-service
- tts-service
environment:
- ASR_SERVICE_URL=http://asr-service:8001
- TTS_SERVICE_URL=http://tts-service:8002
```
### 6.3 模型优化策略
#### 6.3.1 模型量化
```python
# 模型量化优化
import torch
from torch.quantization import quantize_dynamic
class OptimizedWhisperASR(WhisperASR):
def __init__(self, opt, parent):
super().__init__(opt, parent)
# 动态量化优化
self.model = quantize_dynamic(
self.model, {torch.nn.Linear}, dtype=torch.qint8
)
```
#### 6.3.2 模型缓存策略
```python
class ModelCache:
def __init__(self):
self.asr_models = {}
self.tts_models = {}
def get_asr_model(self, model_name):
if model_name not in self.asr_models:
self.asr_models[model_name] = self.load_asr_model(model_name)
return self.asr_models[model_name]
def get_tts_model(self, model_name):
if model_name not in self.tts_models:
self.tts_models[model_name] = self.load_tts_model(model_name)
return self.tts_models[model_name]
```
## 7. 性能优化建议
### 7.1 延迟优化
- **流式处理**: 实现真正的流式ASR/TTS
- **预加载**: 模型预热和缓存
- **批处理**: 合理的批处理大小
- **异步处理**: 非阻塞音频处理
### 7.2 资源优化
- **GPU调度**: 智能GPU资源分配
- **内存管理**: 及时释放音频缓冲区
- **模型共享**: 多会话共享模型实例
### 7.3 可扩展性
- **负载均衡**: 多实例部署
- **服务发现**: 动态服务注册
- **监控告警**: 服务健康检查
## 8. 实施路线图
### Phase 1: 基础扩展 (1-2周)
1. 实现Whisper ASR集成
2. 添加百度/阿里云TTS支持
3. 完善配置化选择机制
### Phase 2: 服务化改造 (2-3周)
1. ASR/TTS服务独立部署
2. Docker容器化
3. 微服务架构重构
### Phase 3: 性能优化 (2-3周)
1. 模型量化和优化
2. 缓存策略实施
3. 监控体系建设
### Phase 4: 生产就绪 (1-2周)
1. 负载测试和调优
2. 文档完善
3. 部署自动化
## 9. 风险评估
### 9.1 技术风险
- **模型兼容性**: 不同模型API差异
- **性能瓶颈**: 实时性要求vs模型复杂度
- **资源消耗**: GPU内存和计算资源
### 9.2 缓解策略
- **统一接口**: 抽象层屏蔽差异
- **性能测试**: 提前验证性能指标
- **资源监控**: 实时监控资源使用
---
**技术负责人**: AIfeng
**文档版本**: v1.0
**更新日期**: 2024-12-19
\ No newline at end of file
... ...
# AIfeng/2024-12-19
# 代码质量与可维护性增强建议
## 概述
基于当前豆包模型集成的成功实施,以下是进一步提升代码质量和系统可维护性的建议。这些建议遵循全栈开发架构师的最佳实践,旨在建立长期可持续的技术架构。
## 🏗️ 架构优化建议
### 1. 依赖注入模式
**当前状态:** 直接在函数中硬编码模型选择逻辑
**建议改进:** 实现依赖注入容器
```python
# 建议实现:config/di_container.py
class LLMContainer:
def __init__(self):
self._providers = {}
self._instances = {}
def register(self, interface, implementation):
self._providers[interface] = implementation
def resolve(self, interface):
if interface not in self._instances:
provider = self._providers.get(interface)
if provider:
self._instances[interface] = provider()
return self._instances[interface]
# 使用示例
container = LLMContainer()
container.register('llm_service', DoubaoService)
llm_service = container.resolve('llm_service')
```
### 2. 策略模式重构
**当前状态:** if-elif条件判断选择模型
**建议改进:** 策略模式 + 工厂模式
```python
# 建议实现:llm/strategies/base_strategy.py
from abc import ABC, abstractmethod
class LLMStrategy(ABC):
@abstractmethod
def chat(self, message: str, callback=None) -> str:
pass
@abstractmethod
def get_model_info(self) -> dict:
pass
# llm/strategies/doubao_strategy.py
class DoubaoStrategy(LLMStrategy):
def __init__(self, config):
self.doubao = Doubao(config.get('config_file'))
def chat(self, message: str, callback=None) -> str:
return self.doubao.chat_stream(message, callback)
# llm/factory.py
class LLMFactory:
_strategies = {
'doubao': DoubaoStrategy,
'qwen': QwenStrategy,
}
@classmethod
def create_strategy(cls, model_type: str, config: dict) -> LLMStrategy:
strategy_class = cls._strategies.get(model_type)
if not strategy_class:
raise ValueError(f"Unsupported model type: {model_type}")
return strategy_class(config)
```
### 3. 配置管理中心化
**当前状态:** 多个配置文件分散管理
**建议改进:** 统一配置管理器
```python
# 建议实现:config/config_manager.py
class ConfigManager:
def __init__(self):
self._configs = {}
self._watchers = []
def load_config(self, config_type: str) -> dict:
if config_type not in self._configs:
self._configs[config_type] = self._load_from_file(config_type)
return self._configs[config_type]
def reload_config(self, config_type: str):
"""支持热重载配置"""
self._configs[config_type] = self._load_from_file(config_type)
self._notify_watchers(config_type)
def watch_config(self, callback):
"""配置变更监听"""
self._watchers.append(callback)
```
## 🔧 代码质量提升
### 1. 类型注解完善
**当前状态:** 部分函数缺少类型注解
**建议改进:** 全面添加类型提示
```python
# 建议改进示例
from typing import Dict, Any, Optional, Callable, Union
from dataclasses import dataclass
@dataclass
class LLMResponse:
content: str
model: str
tokens_used: int
response_time: float
def llm_response(
message: str,
nerfreal: BaseReal,
config: Optional[Dict[str, Any]] = None
) -> LLMResponse:
"""LLM响应函数,支持多种模型配置"""
pass
```
### 2. 错误处理标准化
**当前状态:** 简单的try-catch处理
**建议改进:** 自定义异常类型和错误处理链
```python
# 建议实现:llm/exceptions.py
class LLMException(Exception):
"""LLM基础异常类"""
pass
class ConfigurationError(LLMException):
"""配置错误"""
pass
class APIKeyError(LLMException):
"""API密钥错误"""
pass
class ModelNotFoundError(LLMException):
"""模型不存在错误"""
pass
# 错误处理装饰器
def handle_llm_errors(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except APIKeyError as e:
logger.error(f"API密钥错误: {e}")
return ErrorResponse("API密钥配置错误,请检查配置")
except ConfigurationError as e:
logger.error(f"配置错误: {e}")
return ErrorResponse("配置文件错误,请检查配置")
return wrapper
```
### 3. 日志系统增强
**当前状态:** 基础日志记录
**建议改进:** 结构化日志和链路追踪
```python
# 建议实现:logger/structured_logger.py
import structlog
from datetime import datetime
import uuid
class LLMLogger:
def __init__(self):
self.logger = structlog.get_logger()
def log_request(self, request_id: str, model: str, message: str):
self.logger.info(
"llm_request_start",
request_id=request_id,
model=model,
message_length=len(message),
timestamp=datetime.utcnow().isoformat()
)
def log_response(self, request_id: str, response_time: float, tokens: int):
self.logger.info(
"llm_request_complete",
request_id=request_id,
response_time=response_time,
tokens_used=tokens,
timestamp=datetime.utcnow().isoformat()
)
```
## 🧪 测试策略完善
### 1. 单元测试覆盖
**建议实现:** 完整的测试套件
```python
# test/test_doubao_integration.py
import pytest
from unittest.mock import Mock, patch
from llm.Doubao import Doubao
class TestDoubaoIntegration:
@pytest.fixture
def mock_config(self):
return {
"api_key": "test_key",
"model": "test_model",
"character": {"name": "测试AI"}
}
@patch('llm.Doubao.requests.post')
def test_chat_success(self, mock_post, mock_config):
# 测试正常对话流程
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {"choices": [{"message": {"content": "测试回复"}}]}
mock_post.return_value = mock_response
doubao = Doubao()
response = doubao.chat("测试消息")
assert response == "测试回复"
mock_post.assert_called_once()
def test_api_key_validation(self):
# 测试API密钥验证
with pytest.raises(ValueError, match="API密钥未配置"):
Doubao()
```
### 2. 集成测试自动化
```python
# test/integration/test_llm_pipeline.py
class TestLLMPipeline:
def test_model_switching(self):
"""测试模型切换功能"""
# 测试从qwen切换到doubao
config = {"model_type": "doubao"}
response = llm_response("测试消息", mock_nerfreal, config)
assert response is not None
def test_config_hot_reload(self):
"""测试配置热重载"""
# 修改配置文件
# 验证配置自动重载
pass
```
## 📊 性能监控与优化
### 1. 性能指标收集
```python
# 建议实现:monitoring/metrics.py
from dataclasses import dataclass
from typing import Dict
import time
@dataclass
class PerformanceMetrics:
model_type: str
init_time: float
first_token_time: float
total_response_time: float
tokens_per_second: float
memory_usage: float
class MetricsCollector:
def __init__(self):
self.metrics_history = []
def collect_metrics(self, metrics: PerformanceMetrics):
self.metrics_history.append(metrics)
self._export_to_monitoring_system(metrics)
def get_performance_report(self) -> Dict:
"""生成性能报告"""
if not self.metrics_history:
return {}
recent_metrics = self.metrics_history[-100:] # 最近100次请求
return {
"avg_response_time": sum(m.total_response_time for m in recent_metrics) / len(recent_metrics),
"avg_tokens_per_second": sum(m.tokens_per_second for m in recent_metrics) / len(recent_metrics),
"model_distribution": self._get_model_distribution(recent_metrics)
}
```
### 2. 缓存策略
```python
# 建议实现:cache/llm_cache.py
from functools import lru_cache
import hashlib
import json
class LLMCache:
def __init__(self, max_size: int = 1000):
self.cache = {}
self.max_size = max_size
def get_cache_key(self, message: str, model_config: dict) -> str:
"""生成缓存键"""
content = f"{message}_{json.dumps(model_config, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
def get(self, cache_key: str) -> Optional[str]:
return self.cache.get(cache_key)
def set(self, cache_key: str, response: str):
if len(self.cache) >= self.max_size:
# LRU淘汰策略
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[cache_key] = response
```
## 🔒 安全性增强
### 1. 敏感信息保护
```python
# 建议实现:security/secret_manager.py
import os
from cryptography.fernet import Fernet
class SecretManager:
def __init__(self):
self.cipher_suite = Fernet(self._get_encryption_key())
def _get_encryption_key(self) -> bytes:
key = os.getenv('ENCRYPTION_KEY')
if not key:
key = Fernet.generate_key()
# 保存到安全位置
return key.encode() if isinstance(key, str) else key
def encrypt_api_key(self, api_key: str) -> str:
return self.cipher_suite.encrypt(api_key.encode()).decode()
def decrypt_api_key(self, encrypted_key: str) -> str:
return self.cipher_suite.decrypt(encrypted_key.encode()).decode()
```
### 2. 输入验证和清理
```python
# 建议实现:security/input_validator.py
import re
from typing import List
class InputValidator:
DANGEROUS_PATTERNS = [
r'<script[^>]*>.*?</script>', # XSS
r'javascript:', # JavaScript协议
r'data:text/html', # Data URI
]
def validate_message(self, message: str) -> bool:
"""验证用户输入消息"""
if len(message) > 10000: # 长度限制
return False
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, message, re.IGNORECASE):
return False
return True
def sanitize_message(self, message: str) -> str:
"""清理用户输入"""
# 移除危险字符
sanitized = re.sub(r'[<>"\']', '', message)
return sanitized.strip()
```
## 📚 文档和规范
### 1. API文档自动生成
```python
# 建议实现:使用FastAPI自动生成API文档
from fastapi import FastAPI
from pydantic import BaseModel
class ChatRequest(BaseModel):
message: str
model_type: str = "doubao"
stream: bool = True
class ChatResponse(BaseModel):
response: str
model: str
tokens_used: int
response_time: float
app = FastAPI(title="LLM Chat API", version="1.0.0")
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(request: ChatRequest):
"""聊天接口
支持多种LLM模型的聊天功能:
- 豆包模型:高质量中文对话
- 通义千问:阿里云大模型
"""
pass
```
### 2. 代码规范检查
```yaml
# 建议添加:.pre-commit-config.yaml
repos:
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3.8
- repo: https://github.com/pycqa/flake8
rev: 4.0.1
hooks:
- id: flake8
args: [--max-line-length=88]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.950
hooks:
- id: mypy
additional_dependencies: [types-requests]
```
## 🚀 部署和运维
### 1. 容器化部署
```dockerfile
# 建议改进:Dockerfile.llm
FROM python:3.9-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制代码
COPY llm/ ./llm/
COPY config/ ./config/
COPY *.py ./
# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "from llm import llm_response; print('OK')" || exit 1
CMD ["python", "app.py"]
```
### 2. 监控和告警
```python
# 建议实现:monitoring/health_check.py
class HealthChecker:
def __init__(self):
self.checks = {
'config_files': self._check_config_files,
'model_availability': self._check_model_availability,
'api_connectivity': self._check_api_connectivity,
}
def run_health_check(self) -> Dict[str, bool]:
results = {}
for check_name, check_func in self.checks.items():
try:
results[check_name] = check_func()
except Exception as e:
logger.error(f"Health check {check_name} failed: {e}")
results[check_name] = False
return results
def _check_config_files(self) -> bool:
required_files = ['config/llm_config.json', 'config/doubao_config.json']
return all(os.path.exists(f) for f in required_files)
```
## 📈 实施优先级
### 高优先级(立即实施)
1. ✅ 类型注解完善
2. ✅ 错误处理标准化
3. ✅ 单元测试覆盖
4. ✅ 输入验证和清理
### 中优先级(1-2周内)
1. 🔄 策略模式重构
2. 🔄 配置管理中心化
3. 🔄 性能监控系统
4. 🔄 缓存策略实施
### 低优先级(长期规划)
1. ⏳ 依赖注入容器
2. ⏳ 微服务架构拆分
3. ⏳ 分布式缓存
4. ⏳ 自动化运维
## 总结
这些建议基于当前豆包模型集成的成功经验,旨在建立一个可扩展、可维护、高性能的LLM服务架构。建议按优先级逐步实施,确保每个改进都经过充分测试和验证。
通过这些改进,系统将具备:
- 🏗️ 更好的架构设计
- 🔧 更高的代码质量
- 🧪 更完善的测试覆盖
- 📊 更强的性能监控
- 🔒 更好的安全保障
- 📚 更完整的文档
- 🚀 更便捷的部署运维
---
**开发者**: AIfeng
**更新时间**: 2024-12-19
**版本**: 1.0.0
\ No newline at end of file
... ...
# AIfeng/2025-06-26
# 豆包大模型集成使用指南
## 概述
本项目已成功集成火山引擎豆包大模型,支持与原有通义千问模型无缝切换。豆包模型提供了丰富的配置选项,包括人物设定、对话风格、API 参数等,全部采用配置文件管理。
## 快速开始
### 1. 获取 API 密钥
访问 [火山引擎控制台](https://console.volcengine.com/) 获取豆包 API 密钥:
1. 登录火山引擎控制台
2. 进入"豆包大模型"服务
3. 创建 API 密钥
4. 复制 API Key 和模型端点 ID
### 2. 配置环境变量
设置豆包 API 密钥环境变量:
```bash
# Windows
set DOUBAO_API_KEY=your_api_key_here
# Linux/Mac
export DOUBAO_API_KEY=your_api_key_here
```
### 3. 配置模型选择
编辑 `config/llm_config.json`,设置使用豆包模型:
```json
{
"model_type": "doubao"
}
```
### 4. 自定义豆包配置
编辑 `config/doubao_config.json` 配置文件:
```json
{
"model": "your_endpoint_id_here",
"api_key": "可选:直接在配置文件中设置",
"character": {
"name": "小艺",
"personality": "友善、专业、有趣",
"background": "AI语音聊天机器人"
}
}
```
## 配置文件详解
### LLM 统一配置 (`config/llm_config.json`)
```json
{
"model_type": "doubao", // 模型类型:"qwen" "doubao"
"description": "LLM模型配置文件",
"models": {
"qwen": {
"name": "通义千问",
"api_key_env": "DASHSCOPE_API_KEY"
},
"doubao": {
"name": "豆包大模型",
"config_file": "config/doubao_config.json"
}
},
"settings": {
"stream": true,
"sentence_split_chars": ",.!;:,。!?:;",
"min_sentence_length": 10,
"log_performance": true
}
}
```
### 豆包详细配置 (`config/doubao_config.json`)
#### API 配置
```json
{
"api_key": "可选:API密钥(推荐使用环境变量)",
"base_url": "https://ark.cn-beijing.volces.com/api/v3",
"model": "ep-20241219000000-xxxxx", // 您的模型端点ID
"stream": true,
"max_tokens": 2048,
"temperature": 0.7,
"top_p": 0.9
}
```
#### 人物设定
```json
{
"character": {
"name": "小艺",
"personality": "友善、专业、有趣的AI助手",
"background": "由艺云展陈开发的AI语音聊天机器人",
"speaking_style": "简洁明了,富有亲和力",
"expertise": ["日常对话", "信息查询", "问题解答"],
"constraints": ["保持礼貌", "提供准确信息", "避免敏感话题"]
}
}
```
#### 响应配置
```json
{
"response_config": {
"max_response_length": 500,
"response_format": "conversational",
"enable_context_memory": true,
"context_window_size": 10
}
}
```
## 使用方式
### 模型切换
1. **切换到豆包模型**
```json
// config/llm_config.json
{ "model_type": "doubao" }
```
2. **切换到通义千问**
```json
// config/llm_config.json
{ "model_type": "qwen" }
```
3. **重启应用**使配置生效
### 人物设定自定义
编辑 `config/doubao_config.json` 中的 `character` 部分:
```json
{
"character": {
"name": "您的AI助手名称",
"personality": "描述AI的性格特点",
"background": "AI的背景设定",
"speaking_style": "对话风格描述",
"expertise": ["专长领域1", "专长领域2"],
"constraints": ["行为约束1", "行为约束2"]
}
}
```
### API 参数调优
根据需要调整以下参数:
- **temperature** (0.0-1.0):控制回复的随机性
- 0.0:最确定性的回复
- 1.0:最随机的回复
- 推荐:0.7
- **top_p** (0.0-1.0):控制词汇选择的多样性
- 推荐:0.9
- **max_tokens**:最大回复长度
- 推荐:1024-2048
## 性能监控
系统会自动记录以下性能指标:
- 模型初始化时间
- 首个 token 响应时间
- 总响应时间
- 分句输出日志
查看日志了解性能表现:
```
豆包模型初始化时间: 0.123s
豆包首个token时间: 0.456s
豆包总响应时间: 2.789s
```
## 故障排除
### 常见问题
1. **API 密钥错误**
```
ValueError: 豆包API密钥未配置
```
解决:检查环境变量 `DOUBAO_API_KEY` 或配置文件中的 `api_key`
2. **模型端点错误**
```
HTTP 404: 模型不存在
```
解决:检查 `config/doubao_config.json` 中的 `model` 字段
3. **配置文件格式错误**
```
json.JSONDecodeError
```
解决:使用 JSON 验证工具检查配置文件格式
4. **模块导入失败**
```
ModuleNotFoundError: No module named 'llm.Doubao'
```
解决:确保 `llm/__init__.py` 文件存在
### 调试模式
运行测试脚本验证集成:
```bash
python test_doubao_integration.py
```
## 最佳实践
1. **安全性**
- 使用环境变量存储 API 密钥
- 不要在代码中硬编码敏感信息
- 定期轮换 API 密钥
2. **性能优化**
- 根据应用场景调整 `max_tokens`
- 使用流式响应提升用户体验
- 监控 API 调用频率和成本
3. **配置管理**
- 为不同环境维护不同的配置文件
- 使用版本控制管理配置变更
- 定期备份配置文件
4. **人物设定**
- 明确定义 AI 的角色和能力边界
- 设置合适的对话风格和语调
- 定期根据用户反馈优化设定
## 技术支持
如遇到问题,请:
1. 查看应用日志获取详细错误信息
2. 运行集成测试脚本诊断问题
3. 检查配置文件格式和内容
4. 验证 API 密钥和网络连接
---
**开发者**: AIfeng
**更新时间**: 2025-06-26
**版本**: 1.0.0
... ...
# AIfeng/2025-01-27
# FunASR集成分析与优化方案
## 问题分析
### 1. 当前状况
- **ASR_server.py**: 实际启动的是**WebSocket服务**,而非gRPC服务(注释中的"grpc server port"是误导性的)
- **服务协议**: 使用`websockets`库提供WebSocket接口,监听指定host:port
- **依赖缺失**: `funasr.py`和`ali_nls.py`引用的`core`、`utils`模块在当前项目中不存在
### 2. 技术架构分析
#### ASR_server.py实现要点
```python
# WebSocket服务器,非gRPC
server = await websockets.serve(ws_serve, args.host, args.port, ping_interval=10)
# 模型初始化
asr_model = AutoModel(
model="paraformer-zh", model_revision="v2.0.4",
vad_model="fsmn-vad", vad_model_revision="v2.0.4",
punc_model="ct-punc-c", punc_model_revision="v2.0.4",
device=f"cuda:{args.gpu_id}" if args.ngpu else "cpu"
)
```
#### 连接方式
- **协议**: WebSocket (ws://)
- **默认端口**: 10197
- **消息格式**: JSON + 二进制音频数据
- **处理模式**: 异步队列处理音频文件
## 集成方案
### 方案一:简化集成(推荐)
创建独立的FunASR客户端,避免依赖缺失问题:
```python
# funasr_simple_client.py
import asyncio
import websockets
import json
import wave
import threading
from queue import Queue
class SimpleFunASRClient:
def __init__(self, host="127.0.0.1", port=10197):
self.host = host
self.port = port
self.websocket = None
self.result_queue = Queue()
self.connected = False
async def connect(self):
uri = f"ws://{self.host}:{self.port}"
try:
self.websocket = await websockets.connect(uri)
self.connected = True
# 启动消息接收线程
threading.Thread(target=self._receive_messages, daemon=True).start()
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def _receive_messages(self):
async def receive():
try:
async for message in self.websocket:
if isinstance(message, str):
self.result_queue.put(message)
except Exception as e:
print(f"接收消息错误: {e}")
self.connected = False
asyncio.run(receive())
async def recognize_file(self, wav_path):
if not self.connected:
return None
# 发送文件路径进行识别
message = {"url": wav_path}
await self.websocket.send(json.dumps(message))
# 等待结果
try:
result = self.result_queue.get(timeout=10)
return result
except:
return None
async def close(self):
if self.websocket:
await self.websocket.close()
self.connected = False
```
### 方案二:修复现有代码
创建缺失的配置模块:
```python
# config_util.py
class Config:
# ASR配置
local_asr_ip = "127.0.0.1"
local_asr_port = 10197
# 阿里云NLS配置
key_ali_nls_key_id = "your_key_id"
key_ali_nls_key_secret = "your_key_secret"
key_ali_nls_app_key = "your_app_key"
# 创建配置实例
config = Config()
# 为了兼容原代码的导入方式
local_asr_ip = config.local_asr_ip
local_asr_port = config.local_asr_port
key_ali_nls_key_id = config.key_ali_nls_key_id
key_ali_nls_key_secret = config.key_ali_nls_key_secret
key_ali_nls_app_key = config.key_ali_nls_app_key
```
## 连接验证
### 测试连接脚本
```python
# test_funasr_connection.py
import asyncio
import websockets
import json
async def test_connection():
uri = "ws://127.0.0.1:10197"
try:
async with websockets.connect(uri) as websocket:
print("✅ FunASR服务连接成功")
# 测试发送消息
test_message = {"url": "test.wav"}
await websocket.send(json.dumps(test_message))
print("✅ 消息发送成功")
# 等待响应
try:
response = await asyncio.wait_for(websocket.recv(), timeout=5)
print(f"✅ 收到响应: {response}")
except asyncio.TimeoutError:
print("⚠️ 未收到响应(可能因为测试文件不存在)")
except ConnectionRefusedError:
print("❌ 连接被拒绝,请确认FunASR服务已启动")
except Exception as e:
print(f"❌ 连接失败: {e}")
if __name__ == "__main__":
asyncio.run(test_connection())
```
## 部署建议
### 1. 服务启动
```bash
# CPU模式
python -u ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 0
# GPU模式
python -u ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 1 --gpu_id 0
```
### 2. 依赖安装
```bash
pip install torch modelscope websockets FunASR torchaudio
```
### 3. 热词配置
创建`data/hotword.txt`文件,每行一个热词:
```
数字人
语音识别
实时对话
```
## 性能优化
### 1. 模型优化
- **量化**: 使用INT8量化减少内存占用
- **批处理**: 支持批量音频处理
- **缓存**: 模型预加载和结果缓存
### 2. 网络优化
- **连接池**: 维护WebSocket连接池
- **重连机制**: 自动重连和错误恢复
- **负载均衡**: 多实例部署
### 3. 监控指标
- 连接数量
- 处理延迟
- 识别准确率
- 资源使用率
## 技术债务
1. **依赖管理**: 原有代码依赖外部模块,需要重构或补全
2. **错误处理**: 缺少完善的异常处理和重连机制
3. **配置管理**: 硬编码配置需要外部化
4. **日志系统**: 缺少结构化日志记录
5. **测试覆盖**: 需要添加单元测试和集成测试
## 实施建议
1. **短期**: 使用简化客户端快速集成
2. **中期**: 修复依赖问题,完善错误处理
3. **长期**: 重构为微服务架构,支持多模型切换
## 结论
FunASR服务使用WebSocket协议,可以正常连接。主要问题是现有集成代码缺少依赖模块。建议采用简化集成方案快速解决连接问题,后续逐步完善架构。
\ No newline at end of file
... ...
# 更新日志
## 2025-05-27 - SessionId管理机制优化 - AIfeng
### 功能增强:SessionId持久化存储和恢复机制
**问题背景:**
- 启动日志显示sessionId是在数字人模型启动时创建的
- 页面刷新后sessionId重置为0,与后端运行的数字人会话不一致
- 缺少sessionId的持久化管理机制
- 用户无法手动重置会话连接
**解决方案:**
1. **SessionId本地存储**
- 实现sessionId的localStorage持久化存储
- 页面刷新后自动恢复之前的sessionId
- 显示当前会话ID状态
2. **会话状态管理**
- 新增可视化的当前会话ID显示框
- 实现会话重置功能,支持手动清除sessionId
- 自动重连机制,页面刷新后尝试恢复WebSocket连接
3. **用户交互优化**
- 添加"重置"按钮,允许用户手动断开并重新连接
- 提供确认对话框,防止误操作
- 增强状态提示和用户反馈
**技术实现:**
```javascript
// SessionId管理功能
function saveSessionId(sessionId) {
localStorage.setItem('currentSessionId', sessionId);
document.getElementById('current-sessionid').value = sessionId;
}
function restoreSessionId() {
var savedSessionId = localStorage.getItem('currentSessionId');
if (savedSessionId && savedSessionId !== '0') {
document.getElementById('sessionid').value = savedSessionId;
return savedSessionId;
}
return null;
}
```
**界面改进:**
- 新增"当前会话ID"显示框,实时显示连接状态
- 添加"重置"按钮,支持会话管理操作
- 优化用户体验,提供清晰的状态反馈
---
## 2025-01-27 - FunASR集成分析与连接验证 - AIfeng
### 技术分析:FunASR服务架构与集成方案
**问题识别:**
- 用户从其他项目拷贝的FunASR相关文件存在依赖缺失问题
- ASR_server.py实际提供WebSocket服务,而非gRPC服务
- funasr.py和ali_nls.py引用的core、utils模块在当前项目中不存在
- 需要验证本地FunASR服务的连接可行性
**技术架构分析:**
1. **服务协议确认**
- ASR_server.py使用websockets库提供WebSocket接口
- 默认监听端口10197,支持CPU/GPU模式切换
- 消息格式:JSON字符串 + 二进制音频数据
- 异步队列处理音频文件识别请求
2. **模型配置**
- 使用FunASR的paraformer-zh模型进行中文语音识别
- 集成VAD(语音活动检测)和标点预测功能
- 支持热词配置,提升特定领域识别准确率
3. **依赖问题分析**
- 缺失模块:core.wsa_server, utils.config_util, utils.util
- 现有代码无法直接运行,需要重构或补全依赖
**解决方案设计:**
1. **简化集成方案(推荐)**
- 创建独立的SimpleFunASRClient类
- 避免复杂依赖,直接使用WebSocket通信
- 提供异步音频识别接口
2. **连接验证工具**
- 开发test_funasr_connection.py测试脚本
- 支持基本连接测试、音频识别测试、消息格式验证
- 自动生成测试音频文件,验证完整流程
**技术实现要点:**
```python
# 简化客户端实现
class SimpleFunASRClient:
async def connect(self):
uri = f"ws://{self.host}:{self.port}"
self.websocket = await websockets.connect(uri)
async def recognize_file(self, wav_path):
message = {"url": wav_path}
await self.websocket.send(json.dumps(message))
return await self.websocket.recv()
```
**部署指导:**
- 启动命令:`python -u ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 0`
- 依赖安装:torch, modelscope, websockets, FunASR, torchaudio
- 热词配置:创建data/hotword.txt文件
**性能优化建议:**
1. 模型量化减少内存占用
2. WebSocket连接池管理
3. 自动重连和错误恢复机制
4. 结构化日志和监控指标
**技术债务识别:**
- 原有代码依赖外部模块,需要重构
- 缺少完善的异常处理机制
- 配置管理硬编码,需要外部化
- 缺少单元测试和集成测试覆盖
**集成建议:**
- 短期:使用简化客户端快速验证连接
- 中期:修复依赖问题,完善错误处理
- 长期:重构为微服务架构,支持多模型切换
---
# 2025-01-27 FunASR测试增强 - 实际音频文件测试功能
**AIfeng/2025-01-27**
## 问题背景
用户提供了三个实际音频文件(yunxi.mp3、yunxia.mp3、yunyang.mp3),需要在现有测试脚本中增加对真实音频文件的识别测试功能,以验证FunASR服务的实际效果。
## 技术实现
### 新增测试方法
- **test_real_audio_files()**: 专门测试实际音频文件的识别功能
- 支持批量测试多个音频文件
- 文件存在性检查
- 30秒超时保护机制
- 详细的识别结果解析和展示
- 错误处理和状态分类
### 测试流程优化
- 将实际音频文件测试集成到主测试序列中
- 测试顺序:基础连接 → 音频识别 → **实际音频文件** → 消息格式
- 增加文件间等待机制,避免服务器压力
### 功能特性
1. **智能文件检测**: 自动检查音频文件是否存在
2. **多格式响应处理**: 支持JSON和纯文本响应解析
3. **详细状态分类**: success/received/timeout/error四种状态
4. **可视化结果展示**: 使用emoji和格式化输出提升可读性
5. **超时保护**: 30秒超时机制防止测试卡死
### 测试结果输出
```
📊 实际音频文件测试总结:
1. 文件: yunxi.mp3
✅ 识别成功: [识别文本]
2. 文件: yunxia.mp3
✅ 识别成功: [识别文本]
3. 文件: yunyang.mp3
✅ 识别成功: [识别文本]
```
## 技术要点
- **异步处理**: 使用asyncio.wait_for实现超时控制
- **错误容错**: 完善的异常处理机制
- **资源管理**: 每个文件独立WebSocket连接,避免状态污染
- **性能优化**: 文件间1秒等待,平衡测试效率和服务器负载
## 使用方法
```bash
# 确保音频文件在项目根目录
# 启动FunASR服务
python -u ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 0
# 运行增强测试
python test_funasr_connection.py
```
## 技术价值
- **验证真实场景**: 从测试WAV文件升级到实际音频文件测试
- **提升测试覆盖**: 增加对MP3格式和真实语音内容的测试
- **改善用户体验**: 直观的测试结果展示和状态反馈
- **增强可维护性**: 模块化的测试方法设计
---
## 2024-12-19 ASR/TTS技术架构分析与扩展方案设计 - AIfeng
### 新增文档
- `doc/dev/asr_tts_architecture_analysis.md` - ASR/TTS技术架构分析与扩展方案
### 技术分析内容
1. **ASR技术实现分析**
- 基于BaseASR的模块化架构设计
- 4种ASR实现: NerfASR、MuseASR、HubertASR、LipASR
- Web端实时ASR基于WebSocket + Web Audio API
- 音频处理流水线: 16kHz采样率,20ms帧长度
2. **TTS技术实现分析**
- 基于BaseTTS的统一框架
- 6种TTS服务: EdgeTTS、FishTTS、SovitsTTS、CosyVoiceTTS、TencentTTS、XTTS
- 异步流式处理架构
- 统一16kHz音频输出
3. **扩展方案设计**
- 第三方ASR集成: 百度、阿里云、腾讯云、Whisper、SenseVoice
- 第三方TTS集成: 百度、Azure、Coqui、PaddleSpeech
- 本地离线服务Docker容器化部署
- 微服务架构重构方案
4. **性能优化策略**
- 模型量化和缓存机制
- 流式处理和异步优化
- GPU资源调度和负载均衡
### 实施建议
- Phase 1: 基础扩展(1-2周) - Whisper ASR + 云端TTS
- Phase 2: 服务化改造(2-3周) - Docker容器化
- Phase 3: 性能优化(2-3周) - 模型优化
- Phase 4: 生产就绪(1-2周) - 部署自动化
### 技术债务识别
- 当前ASR/TTS耦合度较高,需要服务化解耦
- 缺乏统一的配置管理机制
- 性能监控和告警体系待建设
**分析人员**: AIfeng
**工作类型**: 技术架构分析
**影响范围**: ASR/TTS模块
**验证结果:**
- ✅ 页面刷新后sessionId自动恢复
- ✅ 与后端数字人会话保持一致性
- ✅ 支持手动会话重置和重连
- ✅ 提升用户操作便利性
---
## 2025-05-27 - WebSocket连接时序问题修复 - AIfeng
### 问题修复:刷新页面后sessionId未被websocket_connections接收
**问题描述:**
- 刷新页面重新连接数字人后,新的sessionId没有被websocket_connections正确接收
- WebRTC连接建立和WebSocket连接存在时序问题
- sessionId在WebSocket登录时可能仍为0,导致连接关联失败
**根本原因:**
- WebSocket连接建立速度快于WebRTC协商完成
- negotiate()函数设置sessionId到DOM元素存在异步延迟
- connectWebSocket()函数立即读取sessionId值,可能获取到初始值0
**修复方案:**
1. **实现重试机制**
- 在WebSocket连接建立后,等待sessionId正确设置
- 最多重试20次,每次间隔200ms
- 总等待时间不超过4秒
2. **增强日志输出**
- 详细记录sessionId获取过程
- 标记重试次数和等待状态
- 区分正常连接和异常情况
3. **容错处理**
- 即使sessionId为0也允许连接,但记录错误日志
- 避免因时序问题完全阻断WebSocket连接
**技术实现:**
```javascript
function attemptLogin(retryCount = 0) {
var sessionid = parseInt(document.getElementById('sessionid').value) || 0;
if (sessionid === 0 && retryCount < 20) {
console.log(`等待sessionid设置,重试次数: ${retryCount + 1}/20`);
setTimeout(() => attemptLogin(retryCount + 1), 200);
return;
}
// 发送登录消息逻辑...
}
```
**验证结果:**
- ✅ 解决刷新页面后sessionId时序问题
- ✅ 确保新sessionId正确收入websocket_connections
- ✅ 提升WebSocket连接稳定性
- ✅ 增强错误诊断能力
---
## 2025-06-27 - WebSocket消息处理逻辑重构 - AIfeng
### 问题修复:前端消息显示不一致
**问题描述:**
- 对话框中仅显示页面端发出的数据,缺少第三方服务推送的消息
- 用户消息和AI回复未通过WebSocket统一推送
- 语音输入消息直接添加到界面,未等待服务器确认
- 缺少不同大模型的标识区分
**修复方案:**
1. **统一消息推送机制**
- 移除前端直接添加消息到界面的逻辑
- 所有消息(用户输入、语音输入、AI回复)均通过WebSocket推送
- 添加`X-Request-Source: 'web'`头部标识消息来源
2. **新增聊天消息处理**
- 添加`chat_message`类型的WebSocket消息处理
- 支持消息发送者识别(user/human/ai/assistant)
- 集成模型信息和请求来源显示
3. **本地存储增强**
- 自动保存聊天记录到本地存储
- 支持按sessionId区分不同会话
- 记录时间戳、模型信息等元数据
**技术实现:**
```javascript
// WebSocket聊天消息处理
if (messageData.type === 'chat_message') {
var alignment = sender === 'user' ? 'right' : 'left';
var senderLabel = modelInfo ? `AI回复(${modelInfo})` : 'AI回复';
addMessage(messageContent, alignment, senderLabel, messageMode, modelInfo, requestSource);
}
// 移除直接添加消息逻辑
fetch('/human', {
headers: {
'Content-Type': 'application/json',
'X-Request-Source': 'web'
}
});
```
**验证结果:**
- ✅ 前端完全依赖WebSocket接收消息
- ✅ 支持第三方服务推送消息显示
- ✅ 语音输入通过服务器确认后显示
---
## 2025-01-27 - WebSocket连接时序问题修复 - AIfeng
### 问题修复:刷新页面后新sessionId未加入websocket_connections
**问题描述:**
- 页面刷新后,WebSocket连接在页面加载时立即建立
- 此时sessionId仍为默认值0,WebRTC连接尚未建立
- 真正的sessionId在WebRTC连接建立后才从服务器获取
- 导致新会话的WebSocket连接无法正确关联到websocket_connections
**根本原因:**
连接建立时序错误:WebSocket连接 → sessionId获取,应该是:sessionId获取 → WebSocket连接
**修复方案:**
1. **调整连接时序**
- 移除页面加载时的自动WebSocket连接
- 在WebRTC连接建立并获得sessionId后触发WebSocket连接
2. **前端逻辑优化**
- 修改`client.js`:在设置sessionId后触发WebSocket连接
- 修改`webrtcapichat.html`:移除页面初始化时的connectWebSocket调用
- 添加sessionId有效性验证和警告日志
3. **保持重连机制**
- 保留页面可见性变化时的重连逻辑
- 保留网络异常时的自动重连机制
**技术实现:**
```javascript
// client.js - 在获得sessionId后触发WebSocket连接
.then((answer) => {
document.getElementById('sessionid').value = answer.sessionid
console.log('SessionID已设置:', answer.sessionid);
if (typeof connectWebSocket === 'function') {
console.log('触发WebSocket连接...');
connectWebSocket();
}
return pc.setRemoteDescription(answer);
})
// webrtcapichat.html - 移除自动连接
// connectWebSocket(); // 移除自动连接,改为在获得sessionid后连接
// 添加sessionId验证
if (sessionid === 0) {
console.warn('警告: sessionid为0,可能WebRTC连接尚未建立');
}
```
**验证结果:**
- ✅ WebSocket连接在获得有效sessionId后建立
- ✅ 新会话正确添加到websocket_connections中
- ✅ 保持原有重连和错误处理机制
- ✅ 添加调试日志便于问题排查
- ✅ AI回复显示模型信息标识
- ✅ 本地存储自动保存聊天记录
---
## 2024-12-19 SessionId 管理机制优化
**AIfeng/2024-12-19**
### 问题描述
前端页面刷新后sessionId丢失,导致数字人会话中断,用户体验不佳。
### 解决方案
1. **持久化存储**: 实现sessionId的本地存储和自动恢复机制
2. **会话状态管理**: 添加会话ID显示和手动重置功能
3. **智能重连**: 页面加载时自动尝试恢复之前的会话连接
4. **用户交互优化**: 提供直观的会话状态反馈和控制选项
### 技术实现
- 在`webrtcapichat.html`中添加sessionId管理函数:`saveSessionId`、`restoreSessionId`、`clearSessionId`
- 修改`client.js`中的`negotiate`函数,在获取sessionId后自动保存到localStorage
- 在WebSocket连接成功后更新UI显示当前sessionId
- 添加"重置"按钮支持手动清除sessionId并重新连接
- 页面初始化时自动尝试恢复sessionId并延迟重连WebSocket
### 界面改进
- 当前会话ID输入框:实时显示连接状态和sessionId值
- 重置按钮:支持用户确认后清除会话并提示重新连接
- 状态提示:连接成功后placeholder显示"已连接"
### ChatOverlay 对话框优化
**背景**: 对话框遮挡数字人界面,影响视觉体验;缺乏灵活的显示控制和透明度调节功能。
**解决方案**:
1. **透明度优化**: 将所有背景透明度调整至50%,减少对数字人界面的遮挡
2. **功能重构**: 将"清空对话记录"按钮改为"隐藏对话框"功能
3. **配置管理**: 在侧边栏新增"对话框配置"模块,集中管理对话框相关设置
4. **持久化配置**: 所有配置项支持本地存储,页面刷新后自动恢复
**技术实现**:
- 调整CSS透明度:chatOverlay主背景、消息框、头像背景均设为50%透明度
- 新增JavaScript函数:`toggleChatOverlay`、`updateChatOverlayOpacity`、`updateMessageOpacity`、`loadChatOverlayConfig`
- 动态样式管理:通过JavaScript动态创建CSS样式实现实时透明度调节
- 事件监听器:滑块控件实时响应用户调整,立即应用视觉效果
**界面改进**:
- 对话框配置模块:显示/隐藏开关、对话框透明度滑块、消息框透明度滑块
- 实时反馈:滑块旁显示当前透明度百分比值
- 重置功能:一键恢复所有配置到默认状态
- 隐藏按钮:原清空按钮改为"-"图标,点击隐藏对话框
### 验证结果
- ✅ SessionId现在能够在页面刷新后自动恢复,保持数字人会话的连续性
- ✅ 对话框现在更加透明,不会过度遮挡数字人界面
- ✅ 用户可根据需要灵活调节透明度和显示状态
- ✅ 所有配置项支持持久化存储,提升用户体验
---
## 2025-01-27 - LLM模型信息显示修复 - AIfeng
### 问题修复:页面显示"Unknown LLM"而非实际模型名称
**问题背景:**
- 启用豆包模型后,页面始终显示"Unknown LLM"
- app.py中尝试获取`nerfreals[sessionid].llm.model_name`但获取失败
- LLM处理函数只创建模型实例处理响应,未设置到nerfreal对象
**根因分析:**
1. **缺失LLM实例绑定**
- `llm.py`中的处理函数创建Doubao实例但未赋值给`nerfreal.llm`
- `app.py`中无法通过`nerfreals[sessionid].llm.model_name`获取模型信息
2. **模型名称属性缺失**
- Doubao类缺少`model_name`属性用于页面显示
- 通义千问使用OpenAI客户端,无统一的模型名称接口
**解决方案:**
1. **Doubao类增强**
- 添加`model_name = "豆包大模型"`属性
- 提供统一的模型名称显示接口
2. **LLM实例绑定**
- 在`_handle_doubao_response`中设置`nerfreal.llm = doubao`
- 同时设置`nerfreal.llm_model_name = doubao.model_name`
3. **通义千问兼容**
- 创建QwenWrapper包装类提供`model_name`属性
- 统一模型信息获取机制
**技术实现:**
```python
# Doubao.py - 添加模型名称属性
self.model_name = "豆包大模型" # 添加model_name属性用于页面显示
# llm.py - 绑定LLM实例
doubao = Doubao()
nerfreal.llm = doubao
nerfreal.llm_model_name = doubao.model_name
# 通义千问包装类
class QwenWrapper:
def __init__(self):
self.model_name = "通义千问"
```
**验证结果:**
- ✅ 豆包模型页面正确显示"豆包大模型"
- ✅ 通义千问模型页面正确显示"通义千问"
- ✅ app.py中`getattr(nerfreals[sessionid], 'llm_model_name', 'Unknown LLM')`正常工作
- ✅ `nerfreals[sessionid].llm.model_name`属性访问成功
- ✅ 模型信息在WebSocket消息中正确传递到前端
---
## 2024-12-19 WebSocket通信机制修正
**AIfeng/2024-12-19**
### 问题描述
用户指出前期实现存在误解,不应该通过HTTP接口返回数据来获取消息内容,而是完全通过WebSocket通信同步数据。需要修正代码,确保所有消息数据都通过WebSocket推送。
### 修复方案
1. **纯WebSocket通信**:移除HTTP响应中的消息数据返回,只保留简单的处理状态
2. **统一数据流**:所有消息显示完全依赖WebSocket推送,不再从HTTP响应获取任何消息数据
3. **简化响应格式**:HTTP接口只返回处理状态,不包含具体的消息内容
4. **保持错误处理**:网络错误仍通过前端直接处理,服务器错误通过WebSocket推送
### 技术实现
**后端修改** (`e:\fengyang\eman_one\app.py`):
- 简化`/human`接口返回格式,只包含`code`和`message`状态信息
- 移除HTTP响应中的`user_message`、`ai_response`、`model_info`等数据字段
- 保持WebSocket推送机制不变,所有消息数据通过WebSocket传输
**前端修改** (`e:\fengyang\eman_one\web\webrtcapichat.html`):
- 移除对HTTP响应数据的处理和界面显示逻辑
- 保留网络错误的本地处理机制
- 所有消息显示完全依赖WebSocket推送的`chat_message`类型数据
- 简化HTTP响应处理,只检查处理状态
### 验证结果
- ✅ HTTP接口不再返回消息数据,只返回处理状态
- ✅ 所有消息显示完全通过WebSocket推送实现
- ✅ 前端不再依赖HTTP响应获取消息内容
- ✅ 网络错误处理机制保持正常
- ✅ WebSocket推送机制保持完整功能
- ✅ 实现了纯WebSocket数据通信架构
---
## 2025-06-26 - AIfeng
### 问题修复:LLM模块导入错误
**问题描述:**
- `ImportError: cannot import name 'llm_response' from 'llm'`
- app.py无法从llm包中导入llm_response函数
**修复方案:**
- 修改 `llm/__init__.py` 文件,添加llm_response函数的正确导入
- 使用importlib.util动态加载llm.py模块,避免循环导入问题
- 更新__all__列表,确保llm_response函数正确导出
**技术实现:**
```python
# 使用importlib.util动态导入
import importlib.util
spec = importlib.util.spec_from_file_location("llm_module", os.path.join(parent_dir, "llm.py"))
llm_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(llm_module)
llm_response = llm_module.llm_response
```
**验证结果:**
- ✅ `from llm import llm_response` 导入成功
- ✅ app.py可以正常导入llm_response函数
- ✅ 豆包模型集成功能完全可用
---
# 2024-12-19 代码质量与可维护性增强建议
## 新增文件
- `doc/dev/code_quality_enhancement.md` - 代码质量与可维护性增强建议文档
## 功能增强
### 架构优化建议
- **依赖注入模式**: 实现LLM容器管理,解耦模型选择逻辑
- **策略模式重构**: 替换if-elif条件判断,提升代码可扩展性
- **配置管理中心化**: 统一配置管理器,支持热重载和变更监听
### 代码质量提升
- **类型注解完善**: 全面添加类型提示,提升代码可读性
- **错误处理标准化**: 自定义异常类型和错误处理链
- **日志系统增强**: 结构化日志和链路追踪
### 测试策略完善
- **单元测试覆盖**: 完整的测试套件和Mock策略
- **集成测试自动化**: 模型切换和配置热重载测试
### 性能监控与优化
- **性能指标收集**: 响应时间、令牌速率、内存使用监控
- **缓存策略**: LRU缓存和智能缓存键生成
### 安全性增强
- **敏感信息保护**: API密钥加密存储
- **输入验证和清理**: XSS防护和输入长度限制
### 文档和规范
- **API文档自动生成**: FastAPI集成和Swagger文档
- **代码规范检查**: pre-commit钩子和自动化检查
### 部署和运维
- **容器化部署**: Docker配置和健康检查
- **监控和告警**: 系统健康检查和故障预警
## 实施优先级
- **高优先级**: 类型注解、错误处理、单元测试、输入验证
- **中优先级**: 策略模式、配置管理、性能监控、缓存策略
- **低优先级**: 依赖注入、微服务架构、分布式缓存、自动化运维
## 技术价值
- 🏗️ 提升架构设计质量
- 🔧 增强代码可维护性
- 🧪 完善测试覆盖率
- 📊 强化性能监控
- 🔒 提升安全保障
- 📚 完善文档体系
- 🚀 优化部署运维
---
## 2024-12-19
### WebSocket双向通信系统实现
**问题描述:**
- `/human` 接口处理的消息无法实时推送到前端页面
- 第三方调用 `/human` 接口时,页面无法获得反馈
- 缺乏页面与服务端的实时通信机制
**解决方案:**
1. **后端WebSocket服务器实现**
- 在 `app.py` 中添加 WebSocket 支持(aiohttp)
- 实现会话级连接管理(`websocket_connections`)
- 添加消息推送函数 `broadcast_message_to_session`
- 支持心跳检测和自动重连机制
2. **前端WebSocket客户端优化**
- 修改 `webrtcapichat.html` 连接到新的 `/ws` 端点
- 实现会话登录和消息类型处理
- 添加聊天消息推送的实时显示
3. **消息推送集成**
- 修改 `human` 函数,在处理消息后推送到WebSocket
- 支持 `chat` 和 `echo` 两种消息类型的推送
- 区分用户消息、AI回复和回音消息的显示
**技术实现:**
- **后端架构**:aiohttp WebSocket + weakref连接管理
- **消息格式**:JSON格式,包含类型、会话ID、内容和来源
- **连接管理**:基于sessionid的会话级连接池
- **错误处理**:连接异常自动清理和日志记录
**核心功能:**
- ✅ 实时双向通信:页面↔服务端
- ✅ 会话级消息推送:支持多用户并发
- ✅ 消息类型区分:用户/AI助手/回音
- ✅ 连接状态管理:自动重连和心跳检测
- ✅ 第三方接口支持:外部调用也能推送到页面
**测试工具:**
- 创建 `websocket_test.html` 测试页面
- 支持连接状态监控和消息发送测试
- 实时日志显示和错误诊断
**技术价值:**
- 🚀 **实时性提升**:消息即时推送,无需轮询
- 🔄 **双向通信**:支持服务端主动推送
- 🎯 **精准推送**:基于会话ID的定向消息
- 🛡️ **稳定性增强**:自动重连和异常处理
- 🔧 **扩展性强**:支持未来更多消息类型
# 2024-12-19 WebRTC Chat页面消息类型支持修复
## 问题描述
- `webrtcapichat.html`中使用"type": "chat"的模型对话数据没有被收录到chatOverlay
- 表单提交时固定发送"type": "echo",无法发送chat类型消息
- WebSocket消息处理逻辑未正确识别chat类型回复
## 修复内容
### 1. 添加消息类型选择器
- 在文本输入表单中添加消息类型下拉选择框
- 支持"智能对话"(chat)和"回音模式"(echo)两种类型
- 默认选择为"智能对话"模式
### 2. 修改表单提交逻辑
- 获取用户选择的消息类型,替代固定的'echo'类型
- 动态发送用户选择的消息类型到服务器
### 3. 优化WebSocket消息处理
- 增加对`messageData.Data.Type`字段的检查
- 根据服务器返回的消息类型正确设置显示模式
- 添加调试日志输出,便于问题排查
- 修复TTS推送时使用原始消息类型而非固定echo类型
## 技术实现
### HTML表单增强
```html
<div class="form-group">
<label for="message-type">消息类型</label>
<select class="form-control" id="message-type">
<option value="chat">智能对话</option>
<option value="echo">回音模式</option>
</select>
</div>
```
### JavaScript逻辑优化
```javascript
// 动态获取消息类型
var messageType = document.getElementById('message-type') ?
document.getElementById('message-type').value : 'chat';
// 消息模式判断增强
if (messageData.Data.Mode) {
mode = messageData.Data.Mode;
} else if (messageData.Data.Type) {
mode = messageData.Data.Type;
} else {
// 启发式判断逻辑
}
```
## 修复效果
- ✅ 用户可以选择发送chat或echo类型消息
- ✅ chat类型的大模型回复正确显示在chatOverlay中
- ✅ 不同消息类型有对应的视觉标识(头像和颜色)
- ✅ 调试信息完善,便于后续维护
## 兼容性说明
- 保持向后兼容,默认消息类型为chat
- 原有echo模式功能不受影响
- 支持服务器端返回的Mode和Type字段
---
## 2024-12-19 - AIfeng
### 功能增强:豆包大模型集成与配置化改造
**新增文件:**
- `llm/Doubao.py` - 豆包大模型API接口实现
- `config/doubao_config.json` - 豆包模型详细配置文件
- `config/llm_config.json` - LLM模型统一配置管理
**文件修改:**
- `llm.py` - 重构LLM响应函数,支持多模型切换
**新增功能:**
1. **豆包模型集成**
- 基于火山引擎豆包API实现对话功能
- 支持流式和非流式响应模式
- 完整的错误处理和异常捕获机制
- 支持自定义API密钥和基础URL配置
2. **配置化架构设计**
- 人物设定完全配置化(性格、背景、对话风格等)
- API参数可配置(模型名称、温度、top_p、最大token等)
- 响应行为配置(流式传输、重试机制、超时设置)
- 高级功能配置(安全过滤、内容审核、日志记录)
3. **多模型统一管理**
- 通过`llm_config.json`统一管理模型选择
- 支持通义千问和豆包模型无缝切换
- 保持原有通义千问功能完整性
- 统一的性能监控和日志记录
4. **流式响应优化**
- 豆包模型支持实时流式输出
- 智能分句处理,提升用户体验
- 首个token时间和总响应时间监控
- 回调函数机制支持自定义处理逻辑
5. **配置文件结构**
- `doubao_config.json`:包含API配置、人物设定、响应配置等
- `llm_config.json`:模型选择和通用设置
- 支持环境变量和配置文件双重配置方式
**技术实现:**
- 重构`llm_response`函数为模块化架构
- 新增`_load_llm_config`、`_handle_doubao_response`、`_handle_qwen_response`函数
- 实现豆包API的HTTP请求封装和流式处理
- 配置文件JSON格式化和错误处理机制
- 性能监控和详细日志记录
**配置示例:**
```json
{
"model_type": "doubao",
"models": {
"doubao": {
"config_file": "config/doubao_config.json"
}
}
}
```
**兼容性:**
- 完全向后兼容原有通义千问配置
- 支持动态模型切换,无需重启服务
- 保持原有API接口不变
---
## 2024-12-19 - AIfeng
### 功能增强:数字人对话记录系统
**文件修改:**
- `web/webrtcapichat.html` - 增强数字人对话页面
**新增功能:**
1. **对话框界面优化**
- 在数字人视频右下角添加完整的对话记录框
- 增加聊天框头部显示"数字人对话记录"
- 添加清空对话记录按钮
- 优化消息显示样式,支持消息动画效果
2. **消息来源标注**
- 用户输入:标注为"用户输入"或"用户语音"
- 数字人回复:标注为"数字人回复"
- 支持不同模式的头像颜色区分(回声模式、对话模式、纯文本模式)
3. **多种输入模式支持**
- 文字输入:通过文本框发送消息
- 语音输入:通过录音功能发送语音消息
- 自动识别并标注输入类型
4. **智能模式识别**
- 回声模式:数字人复述用户输入
- 对话模式:大模型生成回复内容
- 纯文本模式:直接文本显示
- 基于消息内容的启发式模式判断
5. **本地存储功能**
- 自动保存对话记录到浏览器本地存储
- 页面刷新后自动恢复历史对话
- 支持手动清空对话记录
6. **用户体验优化**
- 消息自动滚动到底部
- 空消息输入验证
- 消息时间戳显示
- 响应式布局适配
**技术实现:**
- 扩展 `addMessage` 函数支持来源和模式参数
- 新增 `clearChatHistory`、`saveChatHistory`、`loadChatHistory` 函数
- 优化 WebSocket 消息处理逻辑
- 增强 CSS 样式支持动画和多状态显示
**兼容性:**
- 保持原有 WebRTC 功能完整性
- 向后兼容现有 API 接口
- 支持所有主流浏览器
---
## WebRTC连接优化更新日志
### 问题描述
用户反映WebRTC连接状态显示"Connection state is connecting"但连接时长很长,需要分析和优化连接建立过程。
### 根因分析
通过代码分析发现连接延迟可能由以下原因造成:
1. 模型初始化过程耗时过长(ernerf/musetalk/wav2lip/ultralight模型加载)
2. ICE候选者收集和连接建立过程缺乏监控
3. 音视频轨道初始化缺乏性能监控
4. SDP协商过程缺乏时间追踪
### 优化措施
#### 1. 连接状态监控增强
- 在`app.py`的`on_connectionstatechange`函数中添加详细的时间戳记录
- 增加SessionID标识,便于多会话调试
- 添加连接状态变化的详细日志(connecting/connected/failed/closed)
- 改进错误处理,避免重复删除会话
#### 2. ICE连接监控
- 新增`on_iceconnectionstatechange`事件监听器
- 监控ICE连接状态变化(checking/connected/completed/failed/disconnected)
- 新增`on_icegatheringstatechange`事件监听器
- 监控ICE候选者收集过程(gathering/complete)
#### 3. 模型初始化优化
- 在`build_nerfreal`函数中添加详细的加载时间监控
- 为每种模型类型(Wav2Lip/MuseTalk/ERNeRF/UltraLight)添加专门的日志
- 增强错误处理和资源清理机制
- 添加垃圾回收以优化内存使用
#### 4. 音视频轨道初始化监控
- 监控HumanPlayer创建时间
- 监控音频轨道和视频轨道添加时间
- 记录整个音视频初始化过程的总耗时
#### 5. 编解码器配置监控
- 监控视频编解码器配置过程
- 记录可用编解码器列表(H264/VP8/rtx)
- 监控编解码器偏好设置时间
#### 6. SDP协商过程监控
- 监控SDP协商的完整过程
- 记录远程描述设置、应答创建、本地描述设置的各个阶段
- 计算SDP协商总耗时
### 技术实现细节
#### 时间戳格式
所有时间戳使用`time.time()`获取,精确到毫秒(%.3f格式)
#### 日志格式标准化
```
[SessionID:XXXXXX] 操作描述 at 时间戳
[SessionID:XXXXXX] 操作描述 in X.XXX seconds
```
#### 错误处理改进
- 使用try-catch包装模型初始化过程
- 添加资源清理机制
- 避免重复删除会话导致的KeyError
### 预期效果
1. **问题定位精确化**:通过详细的时间戳记录,可以精确定位连接建立过程中的瓶颈
2. **性能监控可视化**:各个阶段的耗时记录有助于识别性能热点
3. **调试效率提升**:SessionID标识和结构化日志便于多会话并发调试
4. **系统稳定性增强**:改进的错误处理和资源清理机制
### 后续优化建议
1. **模型预加载**:考虑在服务启动时预加载常用模型
2. **连接池优化**:实现模型实例复用机制
3. **网络配置优化**:优化STUN/TURN服务器配置
4. **异步初始化**:将模型初始化与WebRTC连接建立并行处理
## 2024-01-XX STUN服务器优化更新
### 优化背景
基于日志分析发现ICE候选者收集延迟(5秒+)是主要性能瓶颈,需要优化STUN服务器配置。
### 技术实现
#### 1. 多STUN服务器配置
- 添加Google多个STUN服务器节点
- 实现负载均衡和故障转移
- 配置ICE候选者池大小优化
#### 2. ICE收集超时机制
- 设置3秒超时限制
- 避免无限等待ICE收集完成
- 提供降级处理方案
#### 3. 连接状态监控增强
- 添加实时ICE状态显示
- 提供用户友好的连接状态反馈
- 增强调试和问题定位能力
### 文件变更记录
- 修改文件:`e:\fengyang\eman_one\app.py`
- 变更类型:功能增强、性能监控、错误处理改进
- 影响范围:WebRTC连接建立流程、模型初始化流程
- `web/client.js`: 优化STUN配置,添加超时机制和状态监控
- `web/whep.js`: 同步STUN服务器配置优化
### 预期效果
- ICE收集时间从5秒降低到1-2秒
- 总连接时间减少50-60%
- 提升用户连接体验
### 测试建议
1. 启动服务后观察日志输出格式
2. 建立WebRTC连接,记录各阶段耗时
3. 模拟网络延迟环境测试ICE连接过程
4. 测试多会话并发场景下的日志区分度
\ No newline at end of file
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
FunASR语音识别模块
基于BaseASR的FunASR WebSocket客户端实现
"""
import json
import time
import asyncio
import websockets
import threading
import numpy as np
from threading import Thread, Event
from typing import Optional, Callable
import queue
from baseasr import BaseASR
import config_util as cfg
import util
class FunASRClient(BaseASR):
"""FunASR WebSocket客户端"""
def __init__(self, opt, parent=None):
# 确保opt有必要的属性
if not hasattr(opt, 'fps'):
opt.fps = 50 # 默认50fps
if not hasattr(opt, 'batch_size'):
opt.batch_size = 1
if not hasattr(opt, 'l'):
opt.l = 10
if not hasattr(opt, 'r'):
opt.r = 10
super().__init__(opt, parent)
# FunASR配置
self.server_url = f"ws://{cfg.local_asr_ip}:{cfg.local_asr_port}"
self.username = getattr(opt, 'username', 'default_user')
# 连接状态
self.websocket = None
self.connected = False
self.running = False
self.reconnect_delay = getattr(cfg, 'asr_reconnect_delay', 1)
self.max_reconnect_attempts = getattr(cfg, 'asr_max_reconnect_attempts', 5)
# 消息队列
self.message_queue = queue.Queue()
self.result_queue = queue.Queue()
# 线程控制
self.connection_thread = None
self.message_thread = None
self.stop_event = Event()
# 回调函数
self.on_result_callback = None
util.log(1, f"FunASR客户端初始化完成,服务器: {self.server_url}")
def set_result_callback(self, callback: Callable[[str], None]):
"""设置识别结果回调函数
Args:
callback: 回调函数,接收识别结果字符串
"""
self.on_result_callback = callback
async def _connect_websocket(self):
"""连接WebSocket服务器"""
try:
self.websocket = await websockets.connect(
self.server_url,
timeout=getattr(cfg, 'asr_timeout', 30)
)
self.connected = True
util.log(1, f"FunASR WebSocket连接成功: {self.server_url}")
return True
except Exception as e:
util.log(3, f"FunASR WebSocket连接失败: {e}")
self.connected = False
return False
async def _disconnect_websocket(self):
"""断开WebSocket连接"""
if self.websocket:
try:
await self.websocket.close()
except Exception as e:
util.log(2, f"关闭WebSocket连接时出错: {e}")
finally:
self.websocket = None
self.connected = False
async def _send_message(self, message: dict):
"""发送消息到FunASR服务器
Args:
message: 要发送的消息字典
"""
if not self.connected or not self.websocket:
util.log(2, "WebSocket未连接,无法发送消息")
return False
try:
await self.websocket.send(json.dumps(message))
return True
except Exception as e:
util.log(3, f"发送消息失败: {e}")
self.connected = False
return False
async def _receive_messages(self):
"""接收WebSocket消息"""
while self.connected and self.websocket:
try:
message = await asyncio.wait_for(
self.websocket.recv(),
timeout=1.0
)
self._handle_recognition_result(message)
except asyncio.TimeoutError:
continue
except websockets.exceptions.ConnectionClosed:
util.log(2, "WebSocket连接已关闭")
self.connected = False
break
except Exception as e:
util.log(3, f"接收消息时出错: {e}")
self.connected = False
break
async def _send_message_loop(self):
"""发送消息循环"""
while self.connected and self.websocket:
try:
# 检查消息队列
try:
message = self.message_queue.get_nowait()
if isinstance(message, dict):
# JSON消息
await self.websocket.send(json.dumps(message))
util.log(1, f"发送JSON消息: {message}")
elif isinstance(message, bytes):
# 二进制音频数据
await self.websocket.send(message)
util.log(1, f"发送音频数据: {len(message)} bytes")
else:
util.log(2, f"未知消息类型: {type(message)}")
except queue.Empty:
# 队列为空,短暂等待
await asyncio.sleep(0.01)
except websockets.exceptions.ConnectionClosed:
util.log(2, "发送消息时连接已关闭")
self.connected = False
break
except Exception as e:
util.log(3, f"发送消息时出错: {e}")
self.connected = False
break
def _handle_recognition_result(self, message: str):
"""处理识别结果
Args:
message: 识别结果消息
"""
try:
# 尝试解析JSON
try:
result_data = json.loads(message)
if isinstance(result_data, dict) and 'text' in result_data:
recognized_text = result_data['text']
else:
recognized_text = message
except json.JSONDecodeError:
recognized_text = message
# 存储结果
self.result_queue.put(recognized_text)
# 调用回调函数
if self.on_result_callback:
self.on_result_callback(recognized_text)
# 发送到WebSocket服务器(兼容原有逻辑)
self._send_to_web_clients(recognized_text)
util.log(1, f"识别结果: {recognized_text}")
except Exception as e:
util.log(3, f"处理识别结果时出错: {e}")
def _send_to_web_clients(self, text: str):
"""发送识别结果到Web客户端
Args:
text: 识别文本
"""
try:
from core import wsa_server
# 发送到Web客户端
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({
"panelMsg": text,
"Username": self.username
})
# 发送到Human客户端
if wsa_server.get_instance().is_connected_human(self.username):
content = {
'Topic': 'human',
'Data': {'Key': 'log', 'Value': text},
'Username': self.username
}
wsa_server.get_instance().add_cmd(content)
except Exception as e:
util.log(2, f"发送到Web客户端失败: {e}")
async def _connection_loop(self):
"""连接循环,处理重连逻辑"""
reconnect_attempts = 0
while self.running and not self.stop_event.is_set():
if not self.connected:
util.log(1, f"尝试连接FunASR服务器 (第{reconnect_attempts + 1}次)")
if await self._connect_websocket():
reconnect_attempts = 0
# 启动消息处理任务
receive_task = asyncio.create_task(self._receive_messages())
send_task = asyncio.create_task(self._send_message_loop())
# 等待任务完成或连接断开
try:
await asyncio.gather(receive_task, send_task)
except Exception as e:
util.log(3, f"连接任务异常: {e}")
finally:
receive_task.cancel()
send_task.cancel()
else:
reconnect_attempts += 1
if reconnect_attempts >= self.max_reconnect_attempts:
util.log(3, f"达到最大重连次数({self.max_reconnect_attempts}),停止重连")
break
# 等待后重连
await asyncio.sleep(self.reconnect_delay)
self.reconnect_delay = min(self.reconnect_delay * 2, 30) # 指数退避
else:
await asyncio.sleep(0.1)
await self._disconnect_websocket()
def _run_async_loop(self):
"""在独立线程中运行异步事件循环"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self._connection_loop())
except Exception as e:
util.log(3, f"异步循环出错: {e}")
finally:
loop.close()
def start(self):
"""启动FunASR客户端"""
if self.running:
util.log(2, "FunASR客户端已在运行")
return
self.running = True
self.stop_event.clear()
# 启动连接线程
self.connection_thread = Thread(target=self._run_async_loop, daemon=True)
self.connection_thread.start()
util.log(1, "FunASR客户端已启动")
def stop(self):
"""停止FunASR客户端"""
if not self.running:
return
util.log(1, "正在停止FunASR客户端...")
self.running = False
self.stop_event.set()
# 等待线程结束
if self.connection_thread and self.connection_thread.is_alive():
self.connection_thread.join(timeout=5)
util.log(1, "FunASR客户端已停止")
def send_audio_file(self, file_path: str):
"""发送音频文件进行识别
Args:
file_path: 音频文件路径
"""
if not self.connected:
util.log(2, "WebSocket未连接,无法发送音频文件")
return False
message = {"url": file_path}
# 将消息放入队列,由异步线程处理
self.message_queue.put(message)
return True
def send_audio(self, audio_data: bytes):
"""发送音频数据进行识别
Args:
audio_data: 音频字节数据
"""
if not self.connected:
util.log(2, "WebSocket未连接,无法发送音频数据")
return False
# 将音频数据放入队列
self.message_queue.put(audio_data)
return True
def start_recognition(self):
"""开始语音识别"""
if not self.connected:
self.start()
# 发送开始识别消息
start_message = {
'vad_need': False,
'state': 'StartTranscription'
}
self.message_queue.put(start_message)
util.log(1, "开始语音识别")
def stop_recognition(self):
"""停止语音识别"""
if not self.connected:
return
# 发送停止识别消息
stop_message = {
'vad_need': False,
'state': 'StopTranscription'
}
self.message_queue.put(stop_message)
util.log(1, "停止语音识别")
def get_latest_result(self, timeout: float = 0.1) -> Optional[str]:
"""获取最新的识别结果
Args:
timeout: 超时时间
Returns:
识别结果字符串或None
"""
try:
return self.result_queue.get(timeout=timeout)
except queue.Empty:
return None
def warm_up(self):
"""预热模型"""
super().warm_up()
self.start()
# 等待连接建立
max_wait = 10 # 最多等待10秒
wait_time = 0
while not self.connected and wait_time < max_wait:
time.sleep(0.1)
wait_time += 0.1
if self.connected:
util.log(1, "FunASR客户端预热完成")
else:
util.log(2, "FunASR客户端预热超时")
def run_step(self):
"""运行一步处理"""
# 处理待发送的消息
try:
while not self.message_queue.empty():
message = self.message_queue.get_nowait()
# 这里需要通过某种方式发送到异步线程
# 简化实现:直接记录日志
util.log(1, f"准备发送消息: {message}")
except queue.Empty:
pass
# 调用父类方法
super().run_step()
def get_next_feat(self, block=True, timeout=None):
"""获取下一个特征
Args:
block: 是否阻塞
timeout: 超时时间
Returns:
特征数据
"""
# 简化实现,返回空特征
return np.zeros((1, 50), dtype=np.float32)
def __del__(self):
"""析构函数"""
self.stop()
# 兼容性别名
FunASR = FunASRClient
\ No newline at end of file
... ...
... ... @@ -49,6 +49,9 @@ def _handle_doubao_response(message, nerfreal, start_time):
from llm.Doubao import Doubao
doubao = Doubao()
# 设置LLM实例到nerfreal对象,用于页面显示模型信息
nerfreal.llm = doubao
nerfreal.llm_model_name = doubao.model_name
end = time.perf_counter()
logger.info(f"豆包模型初始化时间: {end-start_time:.3f}s")
... ... @@ -103,6 +106,15 @@ def _handle_qwen_response(message, nerfreal, start_time):
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
# 创建模型信息包装类
class QwenWrapper:
def __init__(self):
self.model_name = "通义千问"
# 设置LLM实例到nerfreal对象,用于页面显示模型信息
nerfreal.llm = QwenWrapper()
nerfreal.llm_model_name = "通义千问"
end = time.perf_counter()
logger.info(f"通义千问初始化时间: {end-start_time:.3f}s")
... ...
... ... @@ -22,6 +22,7 @@ class Doubao:
self.api_key = os.getenv("DOUBAO_API_KEY") or self.config.get("api_key")
self.base_url = self.config.get("base_url", "https://ark.cn-beijing.volces.com/api/v3")
self.model = self.config.get("model", "ep-20241219000000-xxxxx")
self.model_name = "豆包大模型" # 添加model_name属性用于页面显示
self.character_config = self.config.get("character", {})
if not self.api_key:
... ...
# AIfeng/2025-01-27
"""
FunASR服务连接测试脚本
用于验证本地FunASR WebSocket服务是否可以正常连接
使用方法:
1. 先启动FunASR服务:python -u web/asr/funasr/ASR_server.py --host "127.0.0.1" --port 10197 --ngpu 0
2. 运行此测试脚本:python test_funasr_connection.py
"""
import asyncio
import websockets
import json
import os
import wave
import numpy as np
from pathlib import Path
class FunASRConnectionTest:
def __init__(self, host="127.0.0.1", port=10197):
self.host = host
self.port = port
self.uri = f"ws://{host}:{port}"
async def test_basic_connection(self):
"""测试基本WebSocket连接"""
print(f"🔍 测试连接到 {self.uri}")
try:
async with websockets.connect(self.uri) as websocket:
print("✅ FunASR WebSocket服务连接成功")
return True
except ConnectionRefusedError:
print("❌ 连接被拒绝,请确认FunASR服务已启动")
print(" 启动命令: python -u web/asr/funasr/ASR_server.py --host \"127.0.0.1\" --port 10197 --ngpu 0")
return False
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
def create_test_wav(self, filename="test_audio.wav", duration=2, sample_rate=16000):
"""创建测试用的WAV文件"""
# 生成简单的正弦波音频
t = np.linspace(0, duration, int(sample_rate * duration), False)
frequency = 440 # A4音符
audio_data = np.sin(2 * np.pi * frequency * t) * 0.3
# 转换为16位整数
audio_data = (audio_data * 32767).astype(np.int16)
# 保存为WAV文件
with wave.open(filename, 'wb') as wav_file:
wav_file.setnchannels(1) # 单声道
wav_file.setsampwidth(2) # 16位
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_data.tobytes())
print(f"📁 创建测试音频文件: {filename}")
return filename
async def test_audio_recognition(self):
"""测试音频识别功能"""
print("\n🎵 测试音频识别功能")
# 创建测试音频文件
test_file = self.create_test_wav()
test_file_path = os.path.abspath(test_file)
try:
async with websockets.connect(self.uri) as websocket:
print("✅ 连接成功,发送音频文件路径")
# 发送音频文件路径
message = {"url": test_file_path}
await websocket.send(json.dumps(message))
print(f"📤 发送消息: {message}")
# 等待识别结果
try:
response = await asyncio.wait_for(websocket.recv(), timeout=10)
print(f"📥 收到识别结果: {response}")
return True
except asyncio.TimeoutError:
print("⏰ 等待响应超时(10秒)")
print(" 这可能是正常的,因为测试音频是纯音调,无法识别为文字")
return True # 超时也算连接成功
except Exception as e:
print(f"❌ 音频识别测试失败: {e}")
return False
finally:
# 清理测试文件
if os.path.exists(test_file):
os.remove(test_file)
print(f"🗑️ 清理测试文件: {test_file}")
async def test_real_audio_files(self):
"""测试实际音频文件的识别效果"""
print("\n🎤 测试实际音频文件识别")
# 实际音频文件列表
audio_files = [
"yunxi.mp3",
"yunxia.mp3",
"yunyang.mp3"
]
results = []
for audio_file in audio_files:
file_path = os.path.abspath(audio_file)
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"⚠️ 音频文件不存在: {file_path}")
continue
print(f"\n🎵 测试音频文件: {audio_file}")
try:
async with websockets.connect(self.uri) as websocket:
print(f"✅ 连接成功,发送音频文件: {audio_file}")
# 发送音频文件路径
message = {"url": file_path}
await websocket.send(json.dumps(message))
print(f"📤 发送消息: {message}")
# 等待识别结果
try:
response = await asyncio.wait_for(websocket.recv(), timeout=30)
print(f"📥 识别结果: {response}")
# 解析响应
try:
result_data = json.loads(response)
if isinstance(result_data, dict) and 'text' in result_data:
recognized_text = result_data['text']
print(f"🎯 识别文本: {recognized_text}")
results.append({
'file': audio_file,
'text': recognized_text,
'status': 'success'
})
else:
print(f"📄 原始响应: {response}")
results.append({
'file': audio_file,
'response': response,
'status': 'received'
})
except json.JSONDecodeError:
print(f"📄 非JSON响应: {response}")
results.append({
'file': audio_file,
'response': response,
'status': 'received'
})
except asyncio.TimeoutError:
print(f"⏰ 等待响应超时(30秒)- {audio_file}")
results.append({
'file': audio_file,
'status': 'timeout'
})
except Exception as e:
print(f"❌ 测试 {audio_file} 失败: {e}")
results.append({
'file': audio_file,
'error': str(e),
'status': 'error'
})
# 文件间等待,避免服务器压力
await asyncio.sleep(1)
# 输出测试总结
print("\n" + "="*50)
print("📊 实际音频文件测试总结:")
for i, result in enumerate(results, 1):
print(f"\n{i}. 文件: {result['file']}")
if result['status'] == 'success':
print(f" ✅ 识别成功: {result['text']}")
elif result['status'] == 'received':
print(f" 📥 收到响应: {result.get('response', 'N/A')}")
elif result['status'] == 'timeout':
print(f" ⏰ 响应超时")
elif result['status'] == 'error':
print(f" ❌ 测试失败: {result.get('error', 'N/A')}")
return len(results) > 0
async def test_message_format(self):
"""测试消息格式兼容性"""
print("\n📋 测试消息格式兼容性")
try:
async with websockets.connect(self.uri) as websocket:
# 测试不同的消息格式
test_messages = [
{"url": "nonexistent.wav"},
{"test": "message"},
"invalid_json"
]
for i, msg in enumerate(test_messages, 1):
try:
if isinstance(msg, dict):
await websocket.send(json.dumps(msg))
print(f"✅ 消息 {i} 发送成功: {msg}")
else:
await websocket.send(msg)
print(f"✅ 消息 {i} 发送成功: {msg}")
# 短暂等待,避免消息堆积
await asyncio.sleep(0.5)
except Exception as e:
print(f"⚠️ 消息 {i} 发送失败: {e}")
return True
except Exception as e:
print(f"❌ 消息格式测试失败: {e}")
return False
def check_dependencies(self):
"""检查依赖项"""
print("🔍 检查依赖项...")
required_modules = [
'websockets',
'asyncio',
'json',
'wave',
'numpy'
]
missing_modules = []
for module in required_modules:
try:
__import__(module)
print(f"✅ {module}")
except ImportError:
print(f"❌ {module} (缺失)")
missing_modules.append(module)
if missing_modules:
print(f"\n⚠️ 缺失依赖项: {', '.join(missing_modules)}")
print("安装命令: pip install " + ' '.join(missing_modules))
return False
print("✅ 所有依赖项检查通过")
return True
def check_funasr_server_file(self):
"""检查FunASR服务器文件是否存在"""
print("\n📁 检查FunASR服务器文件...")
server_path = Path("web/asr/funasr/ASR_server.py")
if server_path.exists():
print(f"✅ 找到服务器文件: {server_path.absolute()}")
return True
else:
print(f"❌ 未找到服务器文件: {server_path.absolute()}")
print(" 请确认文件路径是否正确")
return False
async def run_all_tests(self):
"""运行所有测试"""
print("🚀 开始FunASR连接测试\n")
# 检查依赖
if not self.check_dependencies():
return False
# 检查服务器文件
if not self.check_funasr_server_file():
return False
# 基本连接测试
print("\n" + "="*50)
if not await self.test_basic_connection():
return False
# 音频识别测试
print("\n" + "="*50)
if not await self.test_audio_recognition():
return False
# 实际音频文件测试
print("\n" + "="*50)
await self.test_real_audio_files()
# 消息格式测试
print("\n" + "="*50)
if not await self.test_message_format():
return False
print("\n" + "="*50)
print("🎉 所有测试完成!FunASR服务连接正常")
print("\n💡 集成建议:")
print(" 1. 服务使用WebSocket协议,非gRPC")
print(" 2. 默认监听端口: 10197")
print(" 3. 消息格式: JSON字符串,包含'url'字段指向音频文件路径")
print(" 4. 可以集成到现有项目的ASR模块中")
return True
async def main():
"""主函数"""
tester = FunASRConnectionTest()
success = await tester.run_all_tests()
if not success:
print("\n❌ 测试失败,请检查FunASR服务状态")
return 1
return 0
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
exit(exit_code)
except KeyboardInterrupt:
print("\n⏹️ 测试被用户中断")
exit(1)
except Exception as e:
print(f"\n💥 测试过程中发生错误: {e}")
exit(1)
\ No newline at end of file
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
FunASR集成测试脚本
测试新的FunASRClient与项目的集成效果
"""
import os
import sys
import time
import threading
from pathlib import Path
# 添加项目路径
sys.path.append(os.path.dirname(__file__))
from funasr_asr import FunASRClient
from web.asr.funasr import FunASR
import util
class TestFunASRIntegration:
"""FunASR集成测试类"""
def __init__(self):
self.test_results = []
self.test_audio_files = [
"yunxi.mp3",
"yunxia.mp3",
"yunyang.mp3"
]
def log_test_result(self, test_name: str, success: bool, message: str = ""):
"""记录测试结果"""
status = "✓ 通过" if success else "✗ 失败"
result = f"[{status}] {test_name}"
if message:
result += f" - {message}"
self.test_results.append((test_name, success, message))
print(result)
def test_funasr_client_creation(self):
"""测试FunASRClient创建"""
try:
class SimpleOpt:
def __init__(self):
self.username = "test_user"
opt = SimpleOpt()
client = FunASRClient(opt)
# 检查基本属性
assert hasattr(client, 'server_url')
assert hasattr(client, 'connected')
assert hasattr(client, 'running')
self.log_test_result("FunASRClient创建", True, "客户端创建成功")
return client
except Exception as e:
self.log_test_result("FunASRClient创建", False, f"错误: {e}")
return None
def test_compatibility_wrapper(self):
"""测试兼容性包装器"""
try:
funasr = FunASR("test_user")
# 检查兼容性方法
assert hasattr(funasr, 'start')
assert hasattr(funasr, 'end')
assert hasattr(funasr, 'send')
assert hasattr(funasr, 'add_frame')
assert hasattr(funasr, 'set_message_callback')
self.log_test_result("兼容性包装器", True, "所有兼容性方法存在")
return funasr
except Exception as e:
self.log_test_result("兼容性包装器", False, f"错误: {e}")
return None
def test_callback_mechanism(self):
"""测试回调机制"""
try:
funasr = FunASR("test_user")
callback_called = threading.Event()
received_message = []
def test_callback(message):
received_message.append(message)
callback_called.set()
funasr.set_message_callback(test_callback)
# 模拟接收消息
test_message = "测试识别结果"
funasr._handle_result(test_message)
# 等待回调
if callback_called.wait(timeout=1.0):
if received_message and received_message[0] == test_message:
self.log_test_result("回调机制", True, "回调函数正常工作")
else:
self.log_test_result("回调机制", False, "回调消息不匹配")
else:
self.log_test_result("回调机制", False, "回调超时")
except Exception as e:
self.log_test_result("回调机制", False, f"错误: {e}")
def test_audio_file_existence(self):
"""测试音频文件存在性"""
existing_files = []
missing_files = []
for audio_file in self.test_audio_files:
if os.path.exists(audio_file):
existing_files.append(audio_file)
else:
missing_files.append(audio_file)
if existing_files:
self.log_test_result(
"音频文件检查",
True,
f"找到 {len(existing_files)} 个文件: {', '.join(existing_files)}"
)
if missing_files:
self.log_test_result(
"音频文件缺失",
False,
f"缺少 {len(missing_files)} 个文件: {', '.join(missing_files)}"
)
return existing_files
def test_connection_simulation(self):
"""测试连接模拟"""
try:
client = self.test_funasr_client_creation()
if not client:
return
# 测试启动和停止
client.start()
time.sleep(0.5) # 给连接一些时间
# 检查运行状态
if client.running:
self.log_test_result("客户端启动", True, "客户端成功启动")
else:
self.log_test_result("客户端启动", False, "客户端启动失败")
# 停止客户端
client.stop()
time.sleep(0.5)
if not client.running:
self.log_test_result("客户端停止", True, "客户端成功停止")
else:
self.log_test_result("客户端停止", False, "客户端停止失败")
except Exception as e:
self.log_test_result("连接模拟", False, f"错误: {e}")
def test_message_queue(self):
"""测试消息队列"""
try:
client = self.test_funasr_client_creation()
if not client:
return
# 测试消息入队
test_message = {"test": "message"}
client.message_queue.put(test_message)
# 检查队列
if not client.message_queue.empty():
retrieved_message = client.message_queue.get_nowait()
if retrieved_message == test_message:
self.log_test_result("消息队列", True, "消息队列正常工作")
else:
self.log_test_result("消息队列", False, "消息内容不匹配")
else:
self.log_test_result("消息队列", False, "消息队列为空")
except Exception as e:
self.log_test_result("消息队列", False, f"错误: {e}")
def test_config_loading(self):
"""测试配置加载"""
try:
import config_util as cfg
# 检查关键配置项
required_configs = [
'local_asr_ip',
'local_asr_port',
'asr_timeout',
'asr_reconnect_delay',
'asr_max_reconnect_attempts'
]
missing_configs = []
for config_key in required_configs:
try:
if hasattr(cfg, 'config'):
value = cfg.config.get(config_key)
else:
value = getattr(cfg, config_key, None)
if value is None:
missing_configs.append(config_key)
except:
missing_configs.append(config_key)
if not missing_configs:
self.log_test_result("配置加载", True, "所有必需配置项存在")
else:
self.log_test_result(
"配置加载",
False,
f"缺少配置项: {', '.join(missing_configs)}"
)
except Exception as e:
self.log_test_result("配置加载", False, f"错误: {e}")
def run_all_tests(self):
"""运行所有测试"""
print("\n" + "="*60)
print("FunASR集成测试开始")
print("="*60)
# 运行各项测试
self.test_config_loading()
self.test_funasr_client_creation()
self.test_compatibility_wrapper()
self.test_callback_mechanism()
self.test_message_queue()
self.test_audio_file_existence()
self.test_connection_simulation()
# 输出测试总结
print("\n" + "="*60)
print("测试总结")
print("="*60)
passed_tests = sum(1 for _, success, _ in self.test_results if success)
total_tests = len(self.test_results)
print(f"总测试数: {total_tests}")
print(f"通过测试: {passed_tests}")
print(f"失败测试: {total_tests - passed_tests}")
print(f"成功率: {passed_tests/total_tests*100:.1f}%")
# 显示失败的测试
failed_tests = [(name, msg) for name, success, msg in self.test_results if not success]
if failed_tests:
print("\n失败的测试:")
for name, msg in failed_tests:
print(f" - {name}: {msg}")
print("\n" + "="*60)
return passed_tests == total_tests
def main():
"""主函数"""
tester = TestFunASRIntegration()
success = tester.run_all_tests()
if success:
print("\n🎉 所有测试通过!FunASR集成准备就绪。")
else:
print("\n⚠️ 部分测试失败,请检查相关配置和依赖。")
return 0 if success else 1
if __name__ == "__main__":
exit(main())
\ No newline at end of file
... ...
... ... @@ -90,7 +90,7 @@ class BaseTTS:
###########################################################################################
class EdgeTTS(BaseTTS):
def txt_to_audio(self,msg):
voicename = "zh-CN-XiaoxiaoNeural"
voicename = "zh-CN-YunyangNeural"
text,textevent = msg
t = time.time()
asyncio.new_event_loop().run_until_complete(self.__main(voicename,text))
... ...
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-27
工具函数模块
提供日志、打印等基础功能
"""
import time
import datetime
from typing import Any
def printInfo(level: int, username: str, message: str):
"""打印信息
Args:
level: 日志级别 (0-DEBUG, 1-INFO, 2-WARN, 3-ERROR)
username: 用户名
message: 消息内容
"""
level_names = ['DEBUG', 'INFO', 'WARN', 'ERROR']
level_name = level_names[min(level, 3)]
timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{timestamp}] [{level_name}] [{username}] {message}")
def log(level: int, message: str):
"""记录日志
Args:
level: 日志级别
message: 日志消息
"""
printInfo(level, 'SYSTEM', message)
def get_timestamp() -> str:
"""获取当前时间戳字符串"""
return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def format_duration(seconds: float) -> str:
"""格式化时长
Args:
seconds: 秒数
Returns:
格式化的时长字符串
"""
if seconds < 60:
return f"{seconds:.2f}秒"
elif seconds < 3600:
minutes = int(seconds // 60)
secs = seconds % 60
return f"{minutes}分{secs:.1f}秒"
else:
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
return f"{hours}时{minutes}分{secs:.1f}秒"
def safe_print(obj: Any, prefix: str = ""):
"""安全打印对象,避免编码错误
Args:
obj: 要打印的对象
prefix: 前缀字符串
"""
try:
print(f"{prefix}{obj}")
except UnicodeEncodeError:
print(f"{prefix}{repr(obj)}")
except Exception as e:
print(f"{prefix}[打印错误: {e}]")
class Timer:
"""简单的计时器类"""
def __init__(self):
self.start_time = None
self.end_time = None
def start(self):
"""开始计时"""
self.start_time = time.time()
return self
def stop(self):
"""停止计时"""
self.end_time = time.time()
return self
def elapsed(self) -> float:
"""获取经过的时间(秒)"""
if self.start_time is None:
return 0.0
end = self.end_time if self.end_time else time.time()
return end - self.start_time
def elapsed_str(self) -> str:
"""获取格式化的经过时间"""
return format_duration(self.elapsed())
def __enter__(self):
return self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
\ No newline at end of file
... ...
... ... @@ -276,10 +276,11 @@
margin: 0 6px;
flex-shrink: 0;
border: 1px solid rgba(255,255,255,0.2);
background-color: rgba(255,255,255,0.5);
}
#chatOverlay .text-container {
background-color: rgba(255,255,255,0.95);
background-color: rgba(255,255,255,0.5);
border-radius: 12px;
padding: 8px 12px;
max-width: 75%;
... ... @@ -289,40 +290,40 @@
}
#chatOverlay .message.right .text-container {
background-color: #4285f4;
background-color: rgba(66,133,244,0.5);
color: white;
}
/* 数字人回复样式 - 根据模式区分 */
#chatOverlay .message.left .text-container {
background-color: rgba(248,249,250,0.95);
background-color: rgba(248,249,250,0.5);
border-left: 3px solid #4285f4;
}
/* Echo模式 - 回音重复 */
#chatOverlay .message.left.mode-echo .text-container {
background-color: rgba(255,235,59,0.9);
background-color: rgba(255,235,59,0.5);
border-left: 3px solid #FFC107;
color: #333;
}
/* Chat模式 - 大模型回复 */
#chatOverlay .message.left.mode-chat .text-container {
background-color: rgba(76,175,80,0.9);
background-color: rgba(76,175,80,0.5);
border-left: 3px solid #4CAF50;
color: white;
}
/* Audio模式 - 语音识别回复 */
#chatOverlay .message.left.mode-audio .text-container {
background-color: rgba(156,39,176,0.9);
background-color: rgba(156,39,176,0.5);
border-left: 3px solid #9C27B0;
color: white;
}
/* Plaintext模式 - 纯文本 */
#chatOverlay .message.left.mode-plaintext .text-container {
background-color: rgba(96,125,139,0.9);
background-color: rgba(96,125,139,0.5);
border-left: 3px solid #607D8B;
color: white;
}
... ... @@ -421,24 +422,31 @@
}
/* 响应式适配 */
@media (max-width: 2560px) {
#chatOverlay {
width: min(800px, 40vw) !important;
height: 270px !important;
}
}
/* 响应式适配 */
@media (max-width: 2160px) {
#chatOverlay {
width: min(600px, 32vw) !important;
height: 180px !important;
width: min(800px, 40vw) !important;
height: 270px !important;
}
}
/* 响应式适配 */
@media (max-width: 1200px) {
#chatOverlay {
width: min(400px, 32vw) !important;
width: min(600px, 40vw) !important;
height: 180px !important;
}
}
@media (max-width: 768px) {
#chatOverlay {
width: min(280px, 38vw) !important;
width: min(300px, 40vw) !important;
height: 160px !important;
bottom: 10px !important;
right: 10px !important;
... ... @@ -505,6 +513,25 @@
</div>
<div>
<div class="section-title">对话框配置</div>
<div class="option">
<input id="show-chat-overlay" type="checkbox" checked/>
<label for="show-chat-overlay">显示对话框</label>
</div>
<div class="form-group">
<label for="chat-overlay-opacity">对话框透明度</label>
<input type="range" class="form-control" id="chat-overlay-opacity" min="10" max="90" value="50" step="10">
<small class="form-text text-muted">当前: <span id="opacity-value">50</span>%</small>
</div>
<div class="form-group">
<label for="message-opacity">消息框透明度</label>
<input type="range" class="form-control" id="message-opacity" min="10" max="90" value="50" step="10">
<small class="form-text text-muted">当前: <span id="message-opacity-value">50</span>%</small>
</div>
<button id="reset-chat-config" class="btn btn-secondary">重置配置</button>
</div>
<div>
<div class="section-title">本地存储设置</div>
<div class="option">
<input id="enable-storage" type="checkbox" checked/>
... ... @@ -559,13 +586,13 @@
<video id="video" autoplay="true" playsinline="true"></video>
</div>
<!-- 聊天消息显示区域 -->
<div id="chatOverlay" style="position: absolute; bottom: 15px; right: 15px; width: min(320px, 30vw); height: 200px; overflow: hidden; background-color: rgba(0,0,0,0.6); border-radius: 12px; padding: 8px; color: white; z-index: 1005; backdrop-filter: blur(15px); border: 1px solid rgba(255,255,255,0.08); display: flex; flex-direction: column;">
<div id="chatOverlay" style="position: absolute; bottom: 15px; right: 15px; width: min(320px, 30vw); height: 200px; overflow: hidden; background-color: rgba(0,0,0,0.5); border-radius: 12px; padding: 8px; color: white; z-index: 1005; backdrop-filter: blur(15px); border: 1px solid rgba(255,255,255,0.08); display: flex; flex-direction: column;">
<div id="chatMessages" style="overflow: hidden; flex: 1; margin-bottom: 3px; display: flex; flex-direction: column; justify-content: flex-end; position: relative; cursor: pointer;">
<!-- 消息将在这里动态添加 -->
</div>
<div class="chat-header">
💬 对话
<button class="clear-chat" onclick="clearChatHistory()" title="清空对话记录"></button>
<button class="clear-chat" onclick="toggleChatOverlay()" title="隐藏对话框"></button>
</div>
</div>
</div>
... ... @@ -939,6 +966,62 @@
if (storageEnabled) {
setTimeout(loadChatHistory, 1000); // 延迟1秒加载,确保页面完全加载
}
// 初始化对话框配置
loadChatOverlayConfig();
// 对话框显示/隐藏开关
$('#show-chat-overlay').change(function() {
const chatOverlay = document.getElementById('chatOverlay');
if (this.checked) {
chatOverlay.style.display = 'flex';
localStorage.setItem('chatOverlayVisible', 'true');
} else {
chatOverlay.style.display = 'none';
localStorage.setItem('chatOverlayVisible', 'false');
}
});
// 对话框透明度滑块
$('#chat-overlay-opacity').on('input', function() {
const opacity = this.value;
$('#opacity-value').text(opacity);
updateChatOverlayOpacity(parseInt(opacity));
});
// 消息框透明度滑块
$('#message-opacity').on('input', function() {
const opacity = this.value;
$('#message-opacity-value').text(opacity);
updateMessageOpacity(parseInt(opacity));
});
// 重置对话框配置
$('#reset-chat-config').click(function() {
// 重置为默认值
$('#show-chat-overlay').prop('checked', true);
$('#chat-overlay-opacity').val(50);
$('#opacity-value').text('50');
$('#message-opacity').val(50);
$('#message-opacity-value').text('50');
// 应用默认设置
document.getElementById('chatOverlay').style.display = 'flex';
updateChatOverlayOpacity(50);
updateMessageOpacity(50);
// 清除本地存储
localStorage.removeItem('chatOverlayVisible');
localStorage.removeItem('chatOverlayOpacity');
localStorage.removeItem('messageOpacity');
// 提示用户
const originalText = $(this).text();
$(this).text('已重置!').prop('disabled', true);
setTimeout(() => {
$(this).text(originalText).prop('disabled', false);
}, 1500);
});
});
$('#btn_start_record').click(function() {
... ... @@ -1279,6 +1362,94 @@
}
localStorage.removeItem('chatHistory');
}
// 切换对话框显示/隐藏
function toggleChatOverlay() {
const chatOverlay = document.getElementById('chatOverlay');
const showCheckbox = document.getElementById('show-chat-overlay');
if (chatOverlay.style.display === 'none') {
chatOverlay.style.display = 'flex';
showCheckbox.checked = true;
localStorage.setItem('chatOverlayVisible', 'true');
} else {
chatOverlay.style.display = 'none';
showCheckbox.checked = false;
localStorage.setItem('chatOverlayVisible', 'false');
}
}
// 更新对话框透明度
function updateChatOverlayOpacity(opacity) {
const chatOverlay = document.getElementById('chatOverlay');
const newBgColor = `rgba(0,0,0,${opacity / 100})`;
chatOverlay.style.backgroundColor = newBgColor;
localStorage.setItem('chatOverlayOpacity', opacity);
}
// 更新消息框透明度
function updateMessageOpacity(opacity) {
const style = document.createElement('style');
style.id = 'dynamic-message-opacity';
// 移除旧的样式
const oldStyle = document.getElementById('dynamic-message-opacity');
if (oldStyle) {
oldStyle.remove();
}
style.innerHTML = `
#chatOverlay .text-container {
background-color: rgba(255,255,255,${opacity / 100}) !important;
}
#chatOverlay .message.right .text-container {
background-color: rgba(66,133,244,${opacity / 100}) !important;
}
#chatOverlay .message.left .text-container {
background-color: rgba(248,249,250,${opacity / 100}) !important;
}
#chatOverlay .message.left.mode-echo .text-container {
background-color: rgba(255,235,59,${opacity / 100}) !important;
}
#chatOverlay .message.left.mode-chat .text-container {
background-color: rgba(76,175,80,${opacity / 100}) !important;
}
#chatOverlay .message.left.mode-audio .text-container {
background-color: rgba(156,39,176,${opacity / 100}) !important;
}
#chatOverlay .message.left.mode-plaintext .text-container {
background-color: rgba(96,125,139,${opacity / 100}) !important;
}
#chatOverlay .avatar {
background-color: rgba(255,255,255,${opacity / 100}) !important;
}
`;
document.head.appendChild(style);
localStorage.setItem('messageOpacity', opacity);
}
// 加载对话框配置
function loadChatOverlayConfig() {
// 加载显示状态
const isVisible = localStorage.getItem('chatOverlayVisible');
if (isVisible === 'false') {
document.getElementById('chatOverlay').style.display = 'none';
document.getElementById('show-chat-overlay').checked = false;
}
// 加载透明度设置
const overlayOpacity = localStorage.getItem('chatOverlayOpacity') || '50';
const messageOpacity = localStorage.getItem('messageOpacity') || '50';
document.getElementById('chat-overlay-opacity').value = overlayOpacity;
document.getElementById('opacity-value').textContent = overlayOpacity;
updateChatOverlayOpacity(parseInt(overlayOpacity));
document.getElementById('message-opacity').value = messageOpacity;
document.getElementById('message-opacity-value').textContent = messageOpacity;
updateMessageOpacity(parseInt(messageOpacity));
}
// 初始化聊天滚轮支持
function initChatWheelSupport() {
... ...
No preview for this file type
No preview for this file type
No preview for this file type