adaptive_vad_chunking.py
26.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
# AIfeng/2025-07-07 15:25:48
# 自适应VAD分片优化模块 - 动态平衡响应速度与识别精度
import time
import numpy as np
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass
from enum import Enum
import threading
import logging
from collections import deque
class ChunkStrategy(Enum):
"""分片策略类型"""
FAST_RESPONSE = "fast_response" # 快速响应
HIGH_ACCURACY = "high_accuracy" # 高精度
BALANCED = "balanced" # 平衡模式
ADAPTIVE = "adaptive" # 自适应
class RecognitionStage(Enum):
"""识别阶段"""
IMMEDIATE = "immediate" # 即时识别
REFINED = "refined" # 精化识别
FINAL = "final" # 最终识别
@dataclass
class ChunkConfig:
"""分片配置"""
min_duration: float
max_duration: float
confidence_threshold: float
overlap_ratio: float = 0.1
quality_weight: float = 0.5
speed_weight: float = 0.5
@dataclass
class AudioChunk:
"""音频分片数据结构"""
data: bytes
duration: float
start_time: float
end_time: float
chunk_id: str
strategy: ChunkStrategy
stage: RecognitionStage
confidence: float = 0.0
is_processed: bool = False
parent_chunk_id: Optional[str] = None
is_speech: bool = True # 添加语音检测标志
timestamp: float = 0.0 # 添加时间戳属性
@dataclass
class RecognitionResult:
"""识别结果"""
text: str
confidence: float
chunk_id: str
stage: RecognitionStage
processing_time: float
accuracy_score: float = 0.0
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, window_size: int = 20):
self.window_size = window_size
self.accuracy_history = deque(maxlen=window_size)
self.latency_history = deque(maxlen=window_size)
self.confidence_history = deque(maxlen=window_size)
def record_result(self, result: RecognitionResult, latency: float):
"""记录识别结果"""
self.accuracy_history.append(result.accuracy_score)
self.latency_history.append(latency)
self.confidence_history.append(result.confidence)
def get_recent_accuracy(self) -> float:
"""获取最近的准确率"""
return np.mean(self.accuracy_history) if self.accuracy_history else 0.0
def get_recent_latency(self) -> float:
"""获取最近的延迟"""
return np.mean(self.latency_history) if self.latency_history else 0.0
def get_recent_confidence(self) -> float:
"""获取最近的置信度"""
return np.mean(self.confidence_history) if self.confidence_history else 0.0
def update_metrics(self, metrics: Dict):
"""更新性能指标"""
if 'accuracy' in metrics:
self.accuracy_history.append(metrics['accuracy'])
if 'latency' in metrics:
self.latency_history.append(metrics['latency'])
if 'confidence' in metrics:
self.confidence_history.append(metrics['confidence'])
class AdaptiveVADChunking:
"""自适应VAD分片处理器"""
def __init__(self, config: Dict = None):
self.config = config or self._get_default_config()
# 分片策略配置
self.chunk_strategies = {
ChunkStrategy.FAST_RESPONSE: ChunkConfig(
min_duration=0.5,
max_duration=2.0,
confidence_threshold=0.7,
quality_weight=0.3,
speed_weight=0.7
),
ChunkStrategy.HIGH_ACCURACY: ChunkConfig(
min_duration=1.5,
max_duration=4.0,
confidence_threshold=0.8,
quality_weight=0.8,
speed_weight=0.2
),
ChunkStrategy.BALANCED: ChunkConfig(
min_duration=1.0,
max_duration=3.0,
confidence_threshold=0.75,
quality_weight=0.5,
speed_weight=0.5
),
ChunkStrategy.ADAPTIVE: ChunkConfig(
min_duration=0.8,
max_duration=3.5,
confidence_threshold=0.75,
quality_weight=0.6,
speed_weight=0.4
)
}
self.current_strategy = ChunkStrategy.ADAPTIVE
self.performance_monitor = PerformanceMonitor()
self.chunk_buffer = []
self.processing_queue = deque()
# 自适应参数
self.adaptation_enabled = self.config.get('adaptation_enabled', True)
self.strategy_switch_threshold = self.config.get('strategy_switch_threshold', 0.75)
self.min_samples_for_adaptation = self.config.get('min_samples_for_adaptation', 10)
self.logger = logging.getLogger(__name__)
self._lock = threading.Lock()
# 回调函数管理
self.quality_callbacks = [] # 质量反馈回调
# 内存管理
self.last_cleanup_time = time.time()
self.cleanup_interval = 30.0 # 30秒清理一次
def get_performance_stats(self) -> Dict:
"""获取性能统计"""
with self._lock:
return {
'current_strategy': self.current_strategy.value if hasattr(self.current_strategy, 'value') else str(self.current_strategy),
'total_chunks_processed': getattr(self, 'total_chunks_processed', 0),
'speech_chunks': getattr(self, 'speech_chunks', 0),
'silence_chunks': getattr(self, 'silence_chunks', 0),
'average_chunk_duration': getattr(self, 'average_chunk_duration', 0.0)
}
def set_strategy(self, strategy):
"""设置VAD策略"""
with self._lock:
self.current_strategy = strategy
self.logger.info(f"VAD策略已设置为: {strategy}")
def register_quality_callback(self, callback):
"""注册质量反馈回调函数"""
self.quality_callbacks.append(callback)
self.logger.debug("注册质量反馈回调函数")
def _trigger_quality_callbacks(self, chunk_id: str, quality_metrics: Dict):
"""触发质量反馈回调"""
for callback in self.quality_callbacks:
try:
callback(chunk_id, quality_metrics)
except Exception as e:
self.logger.error(f"质量反馈回调执行失败: {e}")
def create_session(self, session_id: str):
"""创建会话"""
# 为会话初始化相关数据结构
self.logger.info(f"VAD分片会话创建: {session_id}")
def complete_session(self, session_id: str):
"""完成会话"""
# 清理会话相关的缓存数据
with self._lock:
self.chunk_buffer.clear()
self.processing_queue.clear()
# 限制回调函数数量,防止内存泄漏
if len(self.quality_callbacks) > 10:
self.quality_callbacks = self.quality_callbacks[-10:]
self.logger.info(f"VAD分片会话完成: {session_id},已清理缓存数据")
def process_audio(self, session_id: str, audio_data: bytes, sample_rate: int, strategy: ChunkStrategy = None) -> List:
"""处理音频数据(兼容OptimizationManager调用)"""
try:
timestamp = time.time()
chunks = self.process_audio_data(audio_data, timestamp)
return chunks
except Exception as e:
self.logger.error(f"处理音频数据失败: {e}")
return []
def _get_default_config(self) -> Dict:
"""获取默认配置"""
return {
'adaptation_enabled': True,
'strategy_switch_threshold': 0.75,
'min_samples_for_adaptation': 10,
'max_chunk_buffer_size': 50,
'progressive_recognition': True,
'quality_feedback_enabled': True
}
def process_audio_data(self, audio_data: bytes, timestamp: float,
context: Dict = None) -> List[AudioChunk]:
"""处理音频数据并生成分片"""
try:
with self._lock:
# 定期清理内存,防止内存泄漏
current_time = time.time()
if current_time - self.last_cleanup_time > self.cleanup_interval:
self._cleanup_memory()
self.last_cleanup_time = current_time
# 选择最优策略
# 移除未定义的strategy变量引用
if self.adaptation_enabled:
self._update_strategy(context or {})
# 生成音频分片
chunks = self._create_chunks(audio_data, timestamp)
# 添加到处理队列,限制队列大小防止内存泄漏
max_queue_size = self.config.get('max_chunk_buffer_size', 50)
for chunk in chunks:
if len(self.processing_queue) >= max_queue_size:
# 队列满时,移除最旧的分片
removed_chunk = self.processing_queue.popleft()
self.logger.warning(f"处理队列已满,移除分片: {removed_chunk.chunk_id}")
self.processing_queue.append(chunk)
return chunks
except Exception as e:
self.logger.error(f"处理音频数据时出错: {e}")
return []
def _update_strategy(self, context: Dict):
"""更新分片策略"""
if len(self.performance_monitor.accuracy_history) < self.min_samples_for_adaptation:
return
current_accuracy = self.performance_monitor.get_recent_accuracy()
current_latency = self.performance_monitor.get_recent_latency()
# 获取上下文信息
interaction_mode = context.get('interaction_mode', 'normal')
noise_level = context.get('noise_level', 0.1)
user_patience = context.get('user_patience', 'normal') # 'low', 'normal', 'high'
# 策略选择逻辑
new_strategy = self._select_optimal_strategy(
current_accuracy, current_latency, interaction_mode,
noise_level, user_patience
)
if new_strategy != self.current_strategy:
self.logger.info(f"策略切换: {self.current_strategy.value} -> {new_strategy.value}")
self.current_strategy = new_strategy
def _select_optimal_strategy(self, accuracy: float, latency: float,
interaction_mode: str, noise_level: float,
user_patience: str) -> ChunkStrategy:
"""选择最优分片策略"""
# 快速响应条件
if (interaction_mode == 'quick_qa' and accuracy > 0.85 and
user_patience == 'low'):
return ChunkStrategy.FAST_RESPONSE
# 高精度条件
if (noise_level > 0.3 or accuracy < 0.7 or
interaction_mode == 'detailed_analysis'):
return ChunkStrategy.HIGH_ACCURACY
# 自适应条件
if self.config.get('enable_adaptive_strategy', False):
return ChunkStrategy.ADAPTIVE
# 默认平衡模式
return ChunkStrategy.BALANCED
def _create_chunks(self, audio_data: bytes, timestamp: float) -> List[AudioChunk]:
"""创建音频分片"""
chunks = []
current_config = self.chunk_strategies[self.current_strategy]
# 计算分片参数
data_length = len(audio_data)
sample_rate = self.config.get('sample_rate', 16000)
bytes_per_sample = self.config.get('bytes_per_sample', 2)
# 估算音频时长 - 添加除零保护
if sample_rate <= 0 or bytes_per_sample <= 0:
self.logger.error(f"无效的音频参数: sample_rate={sample_rate}, bytes_per_sample={bytes_per_sample}")
return []
audio_duration = data_length / (sample_rate * bytes_per_sample)
# 确定分片大小
chunk_duration = self._calculate_optimal_chunk_duration(
audio_duration, current_config
)
# 生成分片
chunk_size = int(chunk_duration * sample_rate * bytes_per_sample)
overlap_size = int(chunk_size * current_config.overlap_ratio)
start_pos = 0
chunk_index = 0
# 防止无限循环的安全检查
max_iterations = 1000 # 最大迭代次数
iteration_count = 0
while start_pos < data_length and iteration_count < max_iterations:
end_pos = min(start_pos + chunk_size, data_length)
# 确保分片有效(至少有一些数据)
if end_pos <= start_pos:
self.logger.warning(f"无效分片位置: start_pos={start_pos}, end_pos={end_pos}")
break
chunk_data = audio_data[start_pos:end_pos]
chunk_start_time = timestamp + (start_pos / (sample_rate * bytes_per_sample))
chunk_end_time = timestamp + (end_pos / (sample_rate * bytes_per_sample))
chunk = AudioChunk(
data=chunk_data,
duration=chunk_end_time - chunk_start_time,
start_time=chunk_start_time,
end_time=chunk_end_time,
chunk_id=f"{int(timestamp * 1000)}_{chunk_index}",
strategy=self.current_strategy,
stage=RecognitionStage.IMMEDIATE,
timestamp=chunk_start_time # 正确设置timestamp属性
)
chunks.append(chunk)
# 安全的位置更新:确保始终向前推进
next_pos = end_pos - overlap_size if overlap_size > 0 else end_pos
# 防止无限循环:确保位置至少前进1个字节
if next_pos <= start_pos:
next_pos = start_pos + max(1, chunk_size // 4) # 至少前进1/4分片大小
self.logger.warning(f"调整分片位置以防止无限循环: {start_pos} -> {next_pos}")
start_pos = next_pos
chunk_index += 1
iteration_count += 1
if iteration_count >= max_iterations:
self.logger.error(f"分片创建达到最大迭代次数限制: {max_iterations}")
return chunks
def _calculate_optimal_chunk_duration(self, total_duration: float,
config: ChunkConfig) -> float:
"""计算最优分片时长"""
# 基础分片时长
base_duration = min(config.max_duration,
max(config.min_duration, total_duration / 3))
# 根据性能历史调整
recent_accuracy = self.performance_monitor.get_recent_accuracy()
recent_latency = self.performance_monitor.get_recent_latency()
# 动态调整因子
if recent_accuracy < self.strategy_switch_threshold:
# 准确率低,增加分片时长
adjustment_factor = 1.2
elif recent_latency > 2.0: # 延迟过高
# 延迟高,减少分片时长
adjustment_factor = 0.8
else:
adjustment_factor = 1.0
optimal_duration = base_duration * adjustment_factor
# 确保在配置范围内
return max(config.min_duration,
min(config.max_duration, optimal_duration))
def process_audio_chunk(self, audio_data) -> Optional[AudioChunk]:
"""处理单个音频分片"""
try:
# 处理不同类型的音频数据
if isinstance(audio_data, np.ndarray):
# 正确处理numpy数组:先归一化到int16范围,再转换
if audio_data.dtype == np.float32 or audio_data.dtype == np.float64:
# 浮点数组:假设范围在[-1, 1]或[0, 1],转换到int16范围
if audio_data.max() <= 1.0 and audio_data.min() >= 0.0:
# [0, 1] 范围,转换到 [-32768, 32767]
audio_data = (audio_data * 2 - 1) * 32767
else:
# [-1, 1] 范围,直接缩放到 [-32768, 32767]
audio_data = audio_data * 32767
# 修复 BufferError: memoryview has 1 exported buffer
audio_int16 = audio_data.astype(np.int16)
audio_bytes = bytes(audio_int16.tobytes())
else:
# 整数数组,直接转换
# 修复 BufferError: memoryview has 1 exported buffer
audio_int16 = audio_data.astype(np.int16)
audio_bytes = bytes(audio_int16.tobytes())
elif isinstance(audio_data, bytes):
audio_bytes = audio_data
else:
# 尝试转换为bytes
audio_bytes = bytes(audio_data)
timestamp = time.time()
chunks = self.process_audio_data(audio_bytes, timestamp)
return chunks[0] if chunks else None
except Exception as e:
self.logger.error(f"处理音频分片失败: {e}")
return None
def select_optimal_strategy(self) -> ChunkStrategy:
"""选择最优策略"""
try:
recent_accuracy = self.performance_monitor.get_recent_accuracy()
recent_latency = self.performance_monitor.get_recent_latency()
recent_confidence = self.performance_monitor.get_recent_confidence()
# 基于性能指标选择策略
if recent_accuracy < 0.7 or recent_confidence < 0.6:
return ChunkStrategy.HIGH_ACCURACY
elif recent_latency > 1.0:
return ChunkStrategy.FAST_RESPONSE
elif recent_accuracy > 0.9 and recent_latency < 0.5:
return ChunkStrategy.ADAPTIVE
else:
return ChunkStrategy.BALANCED
except Exception as e:
self.logger.error(f"选择最优策略失败: {e}")
return ChunkStrategy.BALANCED
def _cleanup_memory(self):
"""清理内存,防止内存泄漏"""
try:
# 清理过大的chunk_buffer
max_buffer_size = self.config.get('max_chunk_buffer_size', 50)
if len(self.chunk_buffer) > max_buffer_size:
self.chunk_buffer = self.chunk_buffer[-max_buffer_size//2:]
self.logger.info(f"清理chunk_buffer,保留最新的{max_buffer_size//2}个分片")
# 清理过多的回调函数
if len(self.quality_callbacks) > 10:
self.quality_callbacks = self.quality_callbacks[-10:]
self.logger.info("清理过多的质量回调函数")
# 清理处理队列中过旧的分片
current_time = time.time()
old_queue_size = len(self.processing_queue)
self.processing_queue = deque([
chunk for chunk in self.processing_queue
if current_time - chunk.start_time < 60.0 # 保留60秒内的分片
])
if old_queue_size != len(self.processing_queue):
self.logger.info(f"清理处理队列,从{old_queue_size}个分片减少到{len(self.processing_queue)}个")
except Exception as e:
self.logger.error(f"内存清理失败: {e}")
class ProgressiveRecognition:
"""渐进式识别处理器"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.recognition_stages = {
RecognitionStage.IMMEDIATE: 0.8, # 800ms 快速识别
RecognitionStage.REFINED: 2.0, # 2s 精化识别
RecognitionStage.FINAL: 4.0 # 4s 最终识别
}
self.stage_results = {} # 存储各阶段结果
self.logger = logging.getLogger(__name__)
def process_audio_segment(self, chunk: AudioChunk) -> Dict[RecognitionStage, RecognitionResult]:
"""渐进式识别处理"""
results = {}
try:
# 阶段1:快速识别(低延迟)
if chunk.duration >= self.recognition_stages[RecognitionStage.IMMEDIATE]:
immediate_result = self._quick_recognition(chunk)
if immediate_result:
results[RecognitionStage.IMMEDIATE] = immediate_result
# 阶段2:精化识别(平衡)
if chunk.duration >= self.recognition_stages[RecognitionStage.REFINED]:
refined_result = self._refined_recognition(chunk)
if refined_result:
results[RecognitionStage.REFINED] = refined_result
# 阶段3:最终识别(高精度)
if chunk.duration >= self.recognition_stages[RecognitionStage.FINAL]:
final_result = self._final_recognition(chunk)
if final_result:
results[RecognitionStage.FINAL] = final_result
# 存储结果到stage_results中
if results:
self.stage_results[chunk.chunk_id] = results
# 定期清理过期结果,防止内存泄漏
if len(self.stage_results) > 100: # 当结果数量超过100时清理
self.cleanup_old_results(max_age=60.0) # 清理60秒前的结果
return results
except Exception as e:
self.logger.error(f"渐进式识别处理出错: {e}")
return {}
def _quick_recognition(self, chunk: AudioChunk) -> Optional[RecognitionResult]:
"""快速识别(模拟)"""
# 这里应该调用实际的ASR服务
# 模拟快速识别结果(不使用sleep以避免测试卡住)
processing_start = time.time()
# 模拟处理时间(不实际等待)
simulated_processing_time = 0.1 # 100ms 模拟处理时间
return RecognitionResult(
text=f"快速识别结果_{chunk.chunk_id}",
confidence=0.6,
chunk_id=chunk.chunk_id,
stage=RecognitionStage.IMMEDIATE,
processing_time=simulated_processing_time,
accuracy_score=0.7
)
def _refined_recognition(self, chunk: AudioChunk) -> Optional[RecognitionResult]:
"""精化识别(模拟)"""
# 模拟处理时间(不实际等待)
simulated_processing_time = 0.3 # 300ms 模拟处理时间
return RecognitionResult(
text=f"精化识别结果_{chunk.chunk_id}",
confidence=0.8,
chunk_id=chunk.chunk_id,
stage=RecognitionStage.REFINED,
processing_time=simulated_processing_time,
accuracy_score=0.85
)
def _final_recognition(self, chunk: AudioChunk) -> Optional[RecognitionResult]:
"""最终识别(模拟)"""
# 模拟处理时间(不实际等待)
simulated_processing_time = 0.5 # 500ms 模拟处理时间
return RecognitionResult(
text=f"最终识别结果_{chunk.chunk_id}",
confidence=0.9,
chunk_id=chunk.chunk_id,
stage=RecognitionStage.FINAL,
processing_time=simulated_processing_time,
accuracy_score=0.95
)
def get_best_result(self, chunk_id: str) -> Optional[RecognitionResult]:
"""获取指定分片的最佳识别结果"""
if chunk_id not in self.stage_results:
return None
results = self.stage_results[chunk_id]
# 优先返回最终结果,其次是精化结果,最后是即时结果
for stage in [RecognitionStage.FINAL, RecognitionStage.REFINED, RecognitionStage.IMMEDIATE]:
if stage in results:
return results[stage]
return None
def cleanup_old_results(self, max_age: float = 300.0):
"""清理过期的识别结果"""
current_time = time.time()
expired_chunks = []
for chunk_id, results in self.stage_results.items():
# 从chunk_id中提取时间戳(格式:timestamp_index)
try:
chunk_timestamp = float(chunk_id.split('_')[0]) / 1000.0 # 转换为秒
if current_time - chunk_timestamp > max_age:
expired_chunks.append(chunk_id)
except (ValueError, IndexError):
# 如果无法解析时间戳,保留结果
continue
# 清理过期结果
for chunk_id in expired_chunks:
del self.stage_results[chunk_id]
if expired_chunks:
self.logger.info(f"清理了 {len(expired_chunks)} 个过期识别结果")
class ChunkQualityAssessor:
"""分片质量评估器"""
def __init__(self):
self.quality_metrics = {
'signal_to_noise_ratio': 0.0,
'audio_clarity': 0.0,
'speech_continuity': 0.0,
'duration_appropriateness': 0.0
}
def assess_chunk_quality(self, chunk: AudioChunk) -> float:
"""评估分片质量"""
# 这里应该实现实际的音频质量评估算法
# 目前返回模拟值
# 基于时长的质量评估
duration_score = self._assess_duration_quality(chunk.duration)
# 基于策略的质量评估
strategy_score = self._assess_strategy_appropriateness(chunk.strategy)
# 综合质量分数
overall_quality = (duration_score + strategy_score) / 2
return min(1.0, max(0.0, overall_quality))
def _assess_duration_quality(self, duration: float) -> float:
"""评估时长质量"""
# 理想时长范围:1-3秒
if 1.0 <= duration <= 3.0:
return 1.0
elif 0.5 <= duration < 1.0 or 3.0 < duration <= 5.0:
return 0.7
else:
return 0.3
def _assess_strategy_appropriateness(self, strategy: ChunkStrategy) -> float:
"""评估策略适当性"""
# 这里可以根据当前上下文评估策略的适当性
# 目前返回固定值
strategy_scores = {
ChunkStrategy.FAST_RESPONSE: 0.8,
ChunkStrategy.BALANCED: 0.9,
ChunkStrategy.HIGH_ACCURACY: 0.85,
ChunkStrategy.ADAPTIVE: 0.95
}
return strategy_scores.get(strategy, 0.5)