funasr_sync_optimization.md
11.4 KB
AIfeng/2025-07-17 16:38:52
FunASRSync大文件处理优化方案
当前状态确认
启用方案
✅ 当前项目启用的是 funasr_asr_sync.py 同步版本
-
主程序:
app.py第415行导入from funasr_asr_sync import FunASRSync -
实例化:
create_asr_connection()函数中创建FunASRSync(username)实例 -
调用路径:
humanaudio()→handle_funasr()→ensure_asr_connection()→create_asr_connection()
当前实现分析
连接配置:
- WebSocket连接超时: 5秒 (第195-219行)
- 发送间隔: 0.04秒 (第106-120行)
- 重连机制: 指数退避,初始延迟1秒
大文件处理机制:
- 音频数据通过Base64编码一次性发送 (第160-190行)
- 无分块处理机制
- 无流控制
- 无超时重试
问题分析
1. 大文件超时根因
主要问题:
- 一次性发送: 大文件Base64编码后体积增大33%,单次发送易超时
- 无分块机制: 缺乏音频数据分片处理
- 连接超时过短: 5秒超时对大文件处理不足
- 无进度反馈: 客户端无法感知处理进度
技术瓶颈:
- Base64编码内存占用高
- WebSocket单帧大小限制
- 网络传输稳定性依赖
2. 性能影响评估
| 文件大小 | Base64后大小 | 预估传输时间 | 超时风险 |
|---|---|---|---|
| 1MB | 1.33MB | 1-2秒 | 低 |
| 5MB | 6.65MB | 5-10秒 | 中 |
| 10MB | 13.3MB | 10-20秒 | 高 |
| 20MB+ | 26.6MB+ | 20秒+ | 极高 |
优化方案
阶段一:立即可行优化 (1-2天)
1.1 超时参数调优
# 配置优化建议
ASR_CONNECTION_TIMEOUT = 30 # 连接超时从5秒增加到30秒
ASR_SEND_TIMEOUT = 60 # 新增发送超时配置
ASR_CHUNK_SIZE = 1024 * 512 # 512KB分块大小
ASR_SEND_INTERVAL = 0.02 # 发送间隔从0.04秒减少到0.02秒
1.2 分块发送机制
def send_audio_data_chunked(self, audio_bytes, filename="audio.wav", chunk_size=512*1024):
"""分块发送音频数据"""
import base64
import math
try:
# 数据预处理
if hasattr(audio_bytes, 'tobytes'):
audio_bytes = audio_bytes.tobytes()
elif isinstance(audio_bytes, memoryview):
audio_bytes = bytes(audio_bytes)
total_size = len(audio_bytes)
total_chunks = math.ceil(total_size / chunk_size)
util.log(1, f"开始分块发送: {filename}, 总大小: {total_size} bytes, 分块数: {total_chunks}")
# 发送开始信号
start_frame = {
'type': 'audio_start',
'filename': filename,
'total_size': total_size,
'total_chunks': total_chunks,
'chunk_size': chunk_size
}
self._send_frame_with_retry(start_frame)
# 分块发送
for i in range(total_chunks):
start_pos = i * chunk_size
end_pos = min(start_pos + chunk_size, total_size)
chunk_data = audio_bytes[start_pos:end_pos]
# Base64编码分块
chunk_b64 = base64.b64encode(chunk_data).decode('utf-8')
chunk_frame = {
'type': 'audio_chunk',
'filename': filename,
'chunk_index': i,
'chunk_data': chunk_b64,
'is_last': (i == total_chunks - 1)
}
# 发送分块并等待确认
success = self._send_frame_with_retry(chunk_frame)
if not success:
util.log(3, f"分块 {i+1}/{total_chunks} 发送失败")
return False
# 进度日志
if (i + 1) % 10 == 0 or i == total_chunks - 1:
progress = ((i + 1) / total_chunks) * 100
util.log(1, f"发送进度: {progress:.1f}% ({i+1}/{total_chunks})")
# 流控延迟
time.sleep(0.01)
# 发送结束信号
end_frame = {
'type': 'audio_end',
'filename': filename
}
self._send_frame_with_retry(end_frame)
util.log(1, f"音频数据分块发送完成: {filename}")
return True
except Exception as e:
util.log(3, f"分块发送音频数据时出错: {e}")
return False
1.3 重试机制增强
def _send_frame_with_retry(self, frame, max_retries=3, timeout=10):
"""带重试的帧发送"""
for attempt in range(max_retries):
try:
if self.__ws and self.__connected:
# 设置发送超时
start_time = time.time()
self.__ws.send(json.dumps(frame))
# 简单的发送确认检查
time.sleep(0.05) # 等待发送完成
if time.time() - start_time < timeout:
return True
else:
util.log(2, f"发送超时,尝试 {attempt + 1}/{max_retries}")
else:
util.log(2, f"连接不可用,尝试 {attempt + 1}/{max_retries}")
except Exception as e:
util.log(2, f"发送失败,尝试 {attempt + 1}/{max_retries}: {e}")
if attempt < max_retries - 1:
time.sleep(0.5 * (attempt + 1)) # 指数退避
return False
阶段二:稳定性优化 (3-5天)
2.1 连接健康检查
def _health_check(self):
"""连接健康检查"""
if not self.__connected:
return False
try:
# 发送心跳包
ping_frame = {'type': 'ping', 'timestamp': time.time()}
self.__ws.send(json.dumps(ping_frame))
return True
except Exception as e:
util.log(2, f"健康检查失败: {e}")
return False
def _start_health_monitor(self):
"""启动健康监控线程"""
def monitor():
while self.__connected:
if not self._health_check():
util.log(2, "连接健康检查失败,尝试重连")
self.__attempt_reconnect()
time.sleep(30) # 30秒检查一次
Thread(target=monitor, daemon=True).start()
2.2 流控机制
class FlowController:
"""流量控制器"""
def __init__(self, max_pending=5, window_size=1024*1024):
self.max_pending = max_pending
self.window_size = window_size
self.pending_chunks = 0
self.sent_bytes = 0
self.last_reset = time.time()
def can_send(self, chunk_size):
"""检查是否可以发送"""
# 检查待处理分块数
if self.pending_chunks >= self.max_pending:
return False
# 检查窗口大小
now = time.time()
if now - self.last_reset > 1.0: # 每秒重置
self.sent_bytes = 0
self.last_reset = now
if self.sent_bytes + chunk_size > self.window_size:
return False
return True
def on_send(self, chunk_size):
"""发送时调用"""
self.pending_chunks += 1
self.sent_bytes += chunk_size
def on_ack(self):
"""收到确认时调用"""
self.pending_chunks = max(0, self.pending_chunks - 1)
阶段三:架构优化 (1周)
3.1 异步发送队列
import asyncio
from queue import Queue
class AsyncSendQueue:
"""异步发送队列"""
def __init__(self, max_size=100):
self.queue = Queue(maxsize=max_size)
self.running = False
self.worker_thread = None
def start(self):
"""启动发送队列"""
self.running = True
self.worker_thread = Thread(target=self._worker, daemon=True)
self.worker_thread.start()
def _worker(self):
"""队列工作线程"""
while self.running:
try:
item = self.queue.get(timeout=1)
if item is None: # 停止信号
break
frame, callback = item
success = self._send_frame(frame)
if callback:
callback(success)
except Exception as e:
util.log(3, f"发送队列工作异常: {e}")
def enqueue(self, frame, callback=None):
"""入队"""
try:
self.queue.put((frame, callback), timeout=5)
return True
except:
return False
配置文件更新
config_util.py 新增配置
# FunASR大文件优化配置
asr_connection_timeout = 30 # 连接超时(秒)
asr_send_timeout = 60 # 发送超时(秒)
asr_chunk_size = 512 * 1024 # 分块大小(bytes)
asr_max_retries = 3 # 最大重试次数
asr_send_interval = 0.02 # 发送间隔(秒)
asr_flow_control_window = 1024 * 1024 # 流控窗口大小
asr_max_pending_chunks = 5 # 最大待处理分块数
asr_health_check_interval = 30 # 健康检查间隔(秒)
实施计划
第1天:立即优化
- 更新超时配置
- 实现基础分块发送
- 添加重试机制
- 测试小文件兼容性
第2-3天:稳定性增强
- 实现连接健康检查
- 添加流控机制
- 优化错误处理
- 大文件测试验证
第4-5天:性能调优
- 异步发送队列
- 内存使用优化
- 并发处理能力
- 压力测试
第6-7天:监控与文档
- 添加性能监控
- 完善日志记录
- 更新技术文档
- 用户使用指南
监控指标
关键指标
- 传输成功率: 目标 >99%
- 平均传输时间: 大文件 <30秒
- 重连频率: <1次/小时
- 内存使用: 峰值 <原来150%
监控实现
class ASRMetrics:
"""ASR性能指标收集"""
def __init__(self):
self.total_requests = 0
self.success_requests = 0
self.total_bytes = 0
self.total_time = 0
self.reconnect_count = 0
def record_request(self, size_bytes, duration_seconds, success):
self.total_requests += 1
self.total_bytes += size_bytes
self.total_time += duration_seconds
if success:
self.success_requests += 1
def get_stats(self):
if self.total_requests == 0:
return {}
return {
'success_rate': self.success_requests / self.total_requests,
'avg_throughput': self.total_bytes / self.total_time if self.total_time > 0 else 0,
'avg_duration': self.total_time / self.total_requests,
'reconnect_rate': self.reconnect_count / self.total_requests
}
风险评估
技术风险
- 兼容性: 分块机制需要服务端支持
- 性能: 分块可能增加延迟
- 复杂性: 错误处理逻辑复杂化
缓解措施
- 保留原有发送方式作为降级方案
- 渐进式部署,先小范围测试
- 完善监控和告警机制
预期效果
性能提升
- 大文件(>5MB)成功率从60%提升到95%+
- 传输超时减少80%
- 用户体验显著改善
系统稳定性
- 连接稳定性提升
- 错误恢复能力增强
- 资源使用更合理
优化重点: 当前项目确实使用FunASRSync同步版本,主要问题是大文件一次性Base64发送导致超时。通过分块发送、重试机制和流控优化,可以显著改善大文件处理能力。