result_processor.py
6.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# AIfeng/2025-07-17 13:58:00
"""
豆包ASR识别结果处理器
专门处理豆包ASR流式识别结果,提取关键信息并优化日志输出
"""
import logging
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ASRResult:
"""ASR识别结果数据类"""
text: str
is_final: bool
confidence: float = 0.0
timestamp: datetime = None
sequence: int = 0
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class DoubaoResultProcessor:
"""豆包ASR结果处理器"""
def __init__(self,
text_only: bool = True,
log_level: str = 'INFO',
enable_streaming_log: bool = False):
"""
初始化结果处理器
Args:
text_only: 是否只输出文本内容
log_level: 日志级别
enable_streaming_log: 是否启用流式日志(会频繁输出中间结果)
"""
self.text_only = text_only
self.enable_streaming_log = enable_streaming_log
self.logger = self._setup_logger(log_level)
# 流式结果管理
self.current_text = ""
self.last_sequence = 0
self.result_count = 0
def _setup_logger(self, log_level: str) -> logging.Logger:
"""设置日志记录器"""
logger = logging.getLogger(f"DoubaoResultProcessor_{id(self)}")
logger.setLevel(getattr(logging, log_level.upper()))
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def extract_text_from_result(self, result: Dict[str, Any]) -> Optional[ASRResult]:
"""
从豆包ASR完整结果中提取文本信息
Args:
result: 豆包ASR返回的完整结果字典
Returns:
ASRResult: 提取的结果对象,如果无有效文本则返回None
"""
try:
# 检查是否有payload_msg
payload_msg = result.get('payload_msg')
if not payload_msg:
return None
# 提取result字段
result_data = payload_msg.get('result')
if not result_data:
return None
# 提取文本
text = result_data.get('text', '').strip()
if not text:
return None
# 提取其他信息
is_final = result.get('is_last_package', False)
confidence = result_data.get('confidence', 0.0)
sequence = result.get('payload_sequence', 0)
return ASRResult(
text=text,
is_final=is_final,
confidence=confidence,
sequence=sequence
)
except Exception as e:
self.logger.error(f"提取ASR结果文本失败: {e}")
return None
def process_streaming_result(self, result: Dict[str, Any]) -> Optional[str]:
"""
处理流式识别结果
Args:
result: 豆包ASR返回的完整结果字典
Returns:
str: 当前识别文本,如果无变化则返回None
"""
asr_result = self.extract_text_from_result(result)
if not asr_result:
return None
self.result_count += 1
# 流式结果:后一次覆盖前一次
previous_text = self.current_text
self.current_text = asr_result.text
self.last_sequence = asr_result.sequence
# 根据配置决定是否记录日志
if asr_result.is_final:
self.logger.info(f"[最终结果] {asr_result.text}")
elif self.enable_streaming_log:
self.logger.debug(f"[流式更新 #{self.result_count}] {asr_result.text}")
# 返回文本(如果与上次不同)
return asr_result.text if asr_result.text != previous_text else None
def create_optimized_callback(self,
user_callback: Optional[Callable[[str], None]] = None) -> Callable[[Dict[str, Any]], None]:
"""
创建优化的回调函数
Args:
user_callback: 用户自定义回调函数,接收文本参数
Returns:
Callable: 优化后的回调函数
"""
def optimized_callback(result: Dict[str, Any]):
"""优化的回调函数,只处理文本内容"""
try:
# 处理流式结果
text = self.process_streaming_result(result)
# 如果有文本变化且用户提供了回调函数
if text and user_callback:
user_callback(text)
except Exception as e:
self.logger.error(f"处理ASR回调失败: {e}")
return optimized_callback
def get_current_result(self) -> Dict[str, Any]:
"""
获取当前识别状态
Returns:
Dict: 当前状态信息
"""
return {
'current_text': self.current_text,
'last_sequence': self.last_sequence,
'result_count': self.result_count,
'text_length': len(self.current_text)
}
def reset(self):
"""重置处理器状态"""
self.current_text = ""
self.last_sequence = 0
self.result_count = 0
self.logger.info("结果处理器已重置")
# 便捷函数
def create_text_only_callback(user_callback: Optional[Callable[[str], None]] = None,
enable_streaming_log: bool = False) -> Callable[[Dict[str, Any]], None]:
"""
创建只处理文本的回调函数
Args:
user_callback: 用户回调函数
enable_streaming_log: 是否启用流式日志
Returns:
Callable: 优化的回调函数
"""
processor = DoubaoResultProcessor(
text_only=True,
enable_streaming_log=enable_streaming_log
)
return processor.create_optimized_callback(user_callback)
def extract_text_only(result: Dict[str, Any]) -> Optional[str]:
"""
从豆包ASR结果中只提取文本
Args:
result: 豆包ASR完整结果
Returns:
str: 提取的文本,如果无文本则返回None
"""
try:
return result.get('payload_msg', {}).get('result', {}).get('text', '').strip() or None
except Exception:
return None