result_processor.py 6.81 KB
# AIfeng/2025-07-17 13:58:00
"""
豆包ASR识别结果处理器
专门处理豆包ASR流式识别结果,提取关键信息并优化日志输出
"""

import logging
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime


@dataclass
class ASRResult:
    """ASR识别结果数据类"""
    text: str
    is_final: bool
    confidence: float = 0.0
    timestamp: datetime = None
    sequence: int = 0
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()


class DoubaoResultProcessor:
    """豆包ASR结果处理器"""
    
    def __init__(self, 
                 text_only: bool = True,
                 log_level: str = 'INFO',
                 enable_streaming_log: bool = False):
        """
        初始化结果处理器
        
        Args:
            text_only: 是否只输出文本内容
            log_level: 日志级别
            enable_streaming_log: 是否启用流式日志(会频繁输出中间结果)
        """
        self.text_only = text_only
        self.enable_streaming_log = enable_streaming_log
        self.logger = self._setup_logger(log_level)
        
        # 流式结果管理
        self.current_text = ""
        self.last_sequence = 0
        self.result_count = 0
        
    def _setup_logger(self, log_level: str) -> logging.Logger:
        """设置日志记录器"""
        logger = logging.getLogger(f"DoubaoResultProcessor_{id(self)}")
        logger.setLevel(getattr(logging, log_level.upper()))
        
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            
        return logger
    
    def extract_text_from_result(self, result: Dict[str, Any]) -> Optional[ASRResult]:
        """
        从豆包ASR完整结果中提取文本信息
        
        Args:
            result: 豆包ASR返回的完整结果字典
            
        Returns:
            ASRResult: 提取的结果对象,如果无有效文本则返回None
        """
        try:
            # 检查是否有payload_msg
            payload_msg = result.get('payload_msg')
            if not payload_msg:
                return None
            
            # 提取result字段
            result_data = payload_msg.get('result')
            if not result_data:
                return None
            
            # 提取文本
            text = result_data.get('text', '').strip()
            if not text:
                return None
            
            # 提取其他信息
            is_final = result.get('is_last_package', False)
            confidence = result_data.get('confidence', 0.0)
            sequence = result.get('payload_sequence', 0)
            
            return ASRResult(
                text=text,
                is_final=is_final,
                confidence=confidence,
                sequence=sequence
            )
            
        except Exception as e:
            self.logger.error(f"提取ASR结果文本失败: {e}")
            return None
    
    def process_streaming_result(self, result: Dict[str, Any]) -> Optional[str]:
        """
        处理流式识别结果
        
        Args:
            result: 豆包ASR返回的完整结果字典
            
        Returns:
            str: 当前识别文本,如果无变化则返回None
        """
        asr_result = self.extract_text_from_result(result)
        if not asr_result:
            return None
        
        self.result_count += 1
        
        # 流式结果:后一次覆盖前一次
        previous_text = self.current_text
        self.current_text = asr_result.text
        self.last_sequence = asr_result.sequence
        
        # 根据配置决定是否记录日志
        if asr_result.is_final:
            self.logger.info(f"[最终结果] {asr_result.text}")
        elif self.enable_streaming_log:
            self.logger.debug(f"[流式更新 #{self.result_count}] {asr_result.text}")
        
        # 返回文本(如果与上次不同)
        return asr_result.text if asr_result.text != previous_text else None
    
    def create_optimized_callback(self, 
                                  user_callback: Optional[Callable[[str], None]] = None) -> Callable[[Dict[str, Any]], None]:
        """
        创建优化的回调函数
        
        Args:
            user_callback: 用户自定义回调函数,接收文本参数
            
        Returns:
            Callable: 优化后的回调函数
        """
        def optimized_callback(result: Dict[str, Any]):
            """优化的回调函数,只处理文本内容"""
            try:
                # 处理流式结果
                text = self.process_streaming_result(result)
                
                # 如果有文本变化且用户提供了回调函数
                if text and user_callback:
                    user_callback(text)
                    
            except Exception as e:
                self.logger.error(f"处理ASR回调失败: {e}")
        
        return optimized_callback
    
    def get_current_result(self) -> Dict[str, Any]:
        """
        获取当前识别状态
        
        Returns:
            Dict: 当前状态信息
        """
        return {
            'current_text': self.current_text,
            'last_sequence': self.last_sequence,
            'result_count': self.result_count,
            'text_length': len(self.current_text)
        }
    
    def reset(self):
        """重置处理器状态"""
        self.current_text = ""
        self.last_sequence = 0
        self.result_count = 0
        self.logger.info("结果处理器已重置")


# 便捷函数
def create_text_only_callback(user_callback: Optional[Callable[[str], None]] = None,
                              enable_streaming_log: bool = False) -> Callable[[Dict[str, Any]], None]:
    """
    创建只处理文本的回调函数
    
    Args:
        user_callback: 用户回调函数
        enable_streaming_log: 是否启用流式日志
        
    Returns:
        Callable: 优化的回调函数
    """
    processor = DoubaoResultProcessor(
        text_only=True,
        enable_streaming_log=enable_streaming_log
    )
    return processor.create_optimized_callback(user_callback)


def extract_text_only(result: Dict[str, Any]) -> Optional[str]:
    """
    从豆包ASR结果中只提取文本
    
    Args:
        result: 豆包ASR完整结果
        
    Returns:
        str: 提取的文本,如果无文本则返回None
    """
    try:
        return result.get('payload_msg', {}).get('result', {}).get('text', '').strip() or None
    except Exception:
        return None