funasr_sync_optimization.md 11.4 KB

AIfeng/2025-07-17 16:38:52

FunASRSync大文件处理优化方案

当前状态确认

启用方案

当前项目启用的是 funasr_asr_sync.py 同步版本

  • 主程序: app.py 第415行导入 from funasr_asr_sync import FunASRSync
  • 实例化: create_asr_connection() 函数中创建 FunASRSync(username) 实例
  • 调用路径: humanaudio()handle_funasr()ensure_asr_connection()create_asr_connection()

当前实现分析

连接配置:

  • WebSocket连接超时: 5秒 (第195-219行)
  • 发送间隔: 0.04秒 (第106-120行)
  • 重连机制: 指数退避,初始延迟1秒

大文件处理机制:

  • 音频数据通过Base64编码一次性发送 (第160-190行)
  • 无分块处理机制
  • 无流控制
  • 无超时重试

问题分析

1. 大文件超时根因

主要问题:

  • 一次性发送: 大文件Base64编码后体积增大33%,单次发送易超时
  • 无分块机制: 缺乏音频数据分片处理
  • 连接超时过短: 5秒超时对大文件处理不足
  • 无进度反馈: 客户端无法感知处理进度

技术瓶颈:

  • Base64编码内存占用高
  • WebSocket单帧大小限制
  • 网络传输稳定性依赖

2. 性能影响评估

文件大小 Base64后大小 预估传输时间 超时风险
1MB 1.33MB 1-2秒
5MB 6.65MB 5-10秒
10MB 13.3MB 10-20秒
20MB+ 26.6MB+ 20秒+ 极高

优化方案

阶段一:立即可行优化 (1-2天)

1.1 超时参数调优

# 配置优化建议
ASR_CONNECTION_TIMEOUT = 30  # 连接超时从5秒增加到30秒
ASR_SEND_TIMEOUT = 60       # 新增发送超时配置
ASR_CHUNK_SIZE = 1024 * 512 # 512KB分块大小
ASR_SEND_INTERVAL = 0.02    # 发送间隔从0.04秒减少到0.02秒

1.2 分块发送机制

def send_audio_data_chunked(self, audio_bytes, filename="audio.wav", chunk_size=512*1024):
    """分块发送音频数据"""
    import base64
    import math

    try:
        # 数据预处理
        if hasattr(audio_bytes, 'tobytes'):
            audio_bytes = audio_bytes.tobytes()
        elif isinstance(audio_bytes, memoryview):
            audio_bytes = bytes(audio_bytes)

        total_size = len(audio_bytes)
        total_chunks = math.ceil(total_size / chunk_size)

        util.log(1, f"开始分块发送: {filename}, 总大小: {total_size} bytes, 分块数: {total_chunks}")

        # 发送开始信号
        start_frame = {
            'type': 'audio_start',
            'filename': filename,
            'total_size': total_size,
            'total_chunks': total_chunks,
            'chunk_size': chunk_size
        }
        self._send_frame_with_retry(start_frame)

        # 分块发送
        for i in range(total_chunks):
            start_pos = i * chunk_size
            end_pos = min(start_pos + chunk_size, total_size)
            chunk_data = audio_bytes[start_pos:end_pos]

            # Base64编码分块
            chunk_b64 = base64.b64encode(chunk_data).decode('utf-8')

            chunk_frame = {
                'type': 'audio_chunk',
                'filename': filename,
                'chunk_index': i,
                'chunk_data': chunk_b64,
                'is_last': (i == total_chunks - 1)
            }

            # 发送分块并等待确认
            success = self._send_frame_with_retry(chunk_frame)
            if not success:
                util.log(3, f"分块 {i+1}/{total_chunks} 发送失败")
                return False

            # 进度日志
            if (i + 1) % 10 == 0 or i == total_chunks - 1:
                progress = ((i + 1) / total_chunks) * 100
                util.log(1, f"发送进度: {progress:.1f}% ({i+1}/{total_chunks})")

            # 流控延迟
            time.sleep(0.01)

        # 发送结束信号
        end_frame = {
            'type': 'audio_end',
            'filename': filename
        }
        self._send_frame_with_retry(end_frame)

        util.log(1, f"音频数据分块发送完成: {filename}")
        return True

    except Exception as e:
        util.log(3, f"分块发送音频数据时出错: {e}")
        return False

1.3 重试机制增强

def _send_frame_with_retry(self, frame, max_retries=3, timeout=10):
    """带重试的帧发送"""
    for attempt in range(max_retries):
        try:
            if self.__ws and self.__connected:
                # 设置发送超时
                start_time = time.time()
                self.__ws.send(json.dumps(frame))

                # 简单的发送确认检查
                time.sleep(0.05)  # 等待发送完成

                if time.time() - start_time < timeout:
                    return True
                else:
                    util.log(2, f"发送超时,尝试 {attempt + 1}/{max_retries}")
            else:
                util.log(2, f"连接不可用,尝试 {attempt + 1}/{max_retries}")

        except Exception as e:
            util.log(2, f"发送失败,尝试 {attempt + 1}/{max_retries}: {e}")

        if attempt < max_retries - 1:
            time.sleep(0.5 * (attempt + 1))  # 指数退避

    return False

阶段二:稳定性优化 (3-5天)

2.1 连接健康检查

def _health_check(self):
    """连接健康检查"""
    if not self.__connected:
        return False

    try:
        # 发送心跳包
        ping_frame = {'type': 'ping', 'timestamp': time.time()}
        self.__ws.send(json.dumps(ping_frame))
        return True
    except Exception as e:
        util.log(2, f"健康检查失败: {e}")
        return False

def _start_health_monitor(self):
    """启动健康监控线程"""
    def monitor():
        while self.__connected:
            if not self._health_check():
                util.log(2, "连接健康检查失败,尝试重连")
                self.__attempt_reconnect()
            time.sleep(30)  # 30秒检查一次

    Thread(target=monitor, daemon=True).start()

2.2 流控机制

class FlowController:
    """流量控制器"""
    def __init__(self, max_pending=5, window_size=1024*1024):
        self.max_pending = max_pending
        self.window_size = window_size
        self.pending_chunks = 0
        self.sent_bytes = 0
        self.last_reset = time.time()

    def can_send(self, chunk_size):
        """检查是否可以发送"""
        # 检查待处理分块数
        if self.pending_chunks >= self.max_pending:
            return False

        # 检查窗口大小
        now = time.time()
        if now - self.last_reset > 1.0:  # 每秒重置
            self.sent_bytes = 0
            self.last_reset = now

        if self.sent_bytes + chunk_size > self.window_size:
            return False

        return True

    def on_send(self, chunk_size):
        """发送时调用"""
        self.pending_chunks += 1
        self.sent_bytes += chunk_size

    def on_ack(self):
        """收到确认时调用"""
        self.pending_chunks = max(0, self.pending_chunks - 1)

阶段三:架构优化 (1周)

3.1 异步发送队列

import asyncio
from queue import Queue

class AsyncSendQueue:
    """异步发送队列"""
    def __init__(self, max_size=100):
        self.queue = Queue(maxsize=max_size)
        self.running = False
        self.worker_thread = None

    def start(self):
        """启动发送队列"""
        self.running = True
        self.worker_thread = Thread(target=self._worker, daemon=True)
        self.worker_thread.start()

    def _worker(self):
        """队列工作线程"""
        while self.running:
            try:
                item = self.queue.get(timeout=1)
                if item is None:  # 停止信号
                    break

                frame, callback = item
                success = self._send_frame(frame)
                if callback:
                    callback(success)

            except Exception as e:
                util.log(3, f"发送队列工作异常: {e}")

    def enqueue(self, frame, callback=None):
        """入队"""
        try:
            self.queue.put((frame, callback), timeout=5)
            return True
        except:
            return False

配置文件更新

config_util.py 新增配置

# FunASR大文件优化配置
asr_connection_timeout = 30      # 连接超时(秒)
asr_send_timeout = 60           # 发送超时(秒)
asr_chunk_size = 512 * 1024     # 分块大小(bytes)
asr_max_retries = 3             # 最大重试次数
asr_send_interval = 0.02        # 发送间隔(秒)
asr_flow_control_window = 1024 * 1024  # 流控窗口大小
asr_max_pending_chunks = 5      # 最大待处理分块数
asr_health_check_interval = 30  # 健康检查间隔(秒)

实施计划

第1天:立即优化

  • 更新超时配置
  • 实现基础分块发送
  • 添加重试机制
  • 测试小文件兼容性

第2-3天:稳定性增强

  • 实现连接健康检查
  • 添加流控机制
  • 优化错误处理
  • 大文件测试验证

第4-5天:性能调优

  • 异步发送队列
  • 内存使用优化
  • 并发处理能力
  • 压力测试

第6-7天:监控与文档

  • 添加性能监控
  • 完善日志记录
  • 更新技术文档
  • 用户使用指南

监控指标

关键指标

  • 传输成功率: 目标 >99%
  • 平均传输时间: 大文件 <30秒
  • 重连频率: <1次/小时
  • 内存使用: 峰值 <原来150%

监控实现

class ASRMetrics:
    """ASR性能指标收集"""
    def __init__(self):
        self.total_requests = 0
        self.success_requests = 0
        self.total_bytes = 0
        self.total_time = 0
        self.reconnect_count = 0

    def record_request(self, size_bytes, duration_seconds, success):
        self.total_requests += 1
        self.total_bytes += size_bytes
        self.total_time += duration_seconds
        if success:
            self.success_requests += 1

    def get_stats(self):
        if self.total_requests == 0:
            return {}

        return {
            'success_rate': self.success_requests / self.total_requests,
            'avg_throughput': self.total_bytes / self.total_time if self.total_time > 0 else 0,
            'avg_duration': self.total_time / self.total_requests,
            'reconnect_rate': self.reconnect_count / self.total_requests
        }

风险评估

技术风险

  • 兼容性: 分块机制需要服务端支持
  • 性能: 分块可能增加延迟
  • 复杂性: 错误处理逻辑复杂化

缓解措施

  • 保留原有发送方式作为降级方案
  • 渐进式部署,先小范围测试
  • 完善监控和告警机制

预期效果

性能提升

  • 大文件(>5MB)成功率从60%提升到95%+
  • 传输超时减少80%
  • 用户体验显著改善

系统稳定性

  • 连接稳定性提升
  • 错误恢复能力增强
  • 资源使用更合理

优化重点: 当前项目确实使用FunASRSync同步版本,主要问题是大文件一次性Base64发送导致超时。通过分块发送、重试机制和流控优化,可以显著改善大文件处理能力。