streaming_recorder.py
32.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
# AIfeng/2025-07-07 09:34:55
# 流式录音器 - 集成流式VAD和识别结果管理
# 核心功能:持续拼接的累积识别、智能语音分段、流式结果处理
import pyaudio
import wave
import threading
import time
import tempfile
import os
import uuid
import struct
from datetime import datetime
from typing import Optional, Callable, Dict, Any
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from .streaming_vad import StreamingVAD
from .streaming_recognition_manager import StreamingRecognitionManager
from .optimization.optimization_manager import OptimizationManager, OptimizationMode
from funasr_asr_sync import FunASRSync
from logger import get_logger
logger = get_logger("StreamingRecorder")
class StreamingRecorder:
"""流式录音器
集成流式VAD和识别结果管理器,实现:
1. 持续拼接的累积识别
2. 智能语音分段
3. 部分识别结果和最终识别结果的管理
4. 异步ASR处理
5. 实时状态通知
"""
def __init__(self,
chunk=1024,
format=pyaudio.paInt16,
channels=1,
rate=16000,
# VAD参数
volume_threshold=0.03,
silence_duration=1.5,
min_speech_duration=0.5,
max_speech_duration=30.0,
pre_buffer_duration=0.5,
dynamic_threshold_factor=0.8,
partial_result_interval=2.0,
# 识别管理参数
confidence_threshold=0.6,
max_session_duration=60.0,
result_merge_window=1.0,
# ASR参数
username="streaming_user",
# 配置参数
config=None):
"""
初始化流式录音器
Args:
chunk: 音频块大小
format: 音频格式
channels: 声道数
rate: 采样率
volume_threshold: 基础音量阈值
silence_duration: 静音持续时间阈值(秒)
min_speech_duration: 最小语音持续时间(秒)
max_speech_duration: 最大语音持续时间(秒)
pre_buffer_duration: 预缓冲时长(秒)
dynamic_threshold_factor: 动态阈值因子
partial_result_interval: 部分识别结果发送间隔(秒)
confidence_threshold: 置信度阈值
max_session_duration: 最大会话持续时间(秒)
result_merge_window: 结果合并时间窗口(秒)
username: 用户名
"""
# 音频参数
self.chunk = chunk
self.format = format
self.channels = channels
self.rate = rate
self.username = username
# 音频增益参数
recorder_config = config.get('streaming_recorder', {}) if config else {}
self.volume_gain = recorder_config.get('audio_gain', 3.0)
self.enable_gain = recorder_config.get('enable_audio_gain', True)
logger.info(f"音频增益配置 - 启用: {self.enable_gain}, 增益倍数: {self.volume_gain}x")
# 录音状态
self.is_recording_flag = False
self.recording_thread = None
self.audio = None
self.stream = None
# 初始化流式VAD
self.vad = StreamingVAD(
sample_rate=rate,
chunk_size=chunk,
volume_threshold=volume_threshold,
silence_duration=silence_duration,
min_speech_duration=min_speech_duration,
max_speech_duration=max_speech_duration,
pre_buffer_duration=pre_buffer_duration,
dynamic_threshold_factor=dynamic_threshold_factor,
partial_result_interval=partial_result_interval
)
# 初始化识别结果管理器
self.recognition_manager = StreamingRecognitionManager(
confidence_threshold=confidence_threshold,
max_session_duration=max_session_duration,
result_merge_window=result_merge_window
)
# 初始化优化管理器
self.optimization_manager = OptimizationManager()
self.optimization_manager.set_optimization_mode(OptimizationMode.BALANCED)
# ASR客户端
self.asr_client = FunASRSync(username)
# 当前会话
self.current_session_id = None
self.pending_asr_requests = {} # 跟踪待处理的ASR请求
# 设置VAD回调
self.vad.on_speech_start = self._on_speech_start
self.vad.on_speech_continue = self._on_speech_continue
self.vad.on_speech_end = self._on_speech_end
self.vad.on_partial_result_ready = self._on_partial_result_ready
# 设置识别管理器回调
self.recognition_manager.on_partial_result = self._on_partial_recognition_result
self.recognition_manager.on_final_result = self._on_final_recognition_result
self.recognition_manager.on_result_updated = self._on_recognition_result_updated
self.recognition_manager.on_session_complete = self._on_session_complete
# 设置优化管理器回调
self.optimization_manager.register_result_callback(self._on_optimization_result)
self.optimization_manager.register_error_callback(self._on_optimization_error)
self.optimization_manager.register_metrics_callback(self._on_optimization_metrics)
# 外部回调函数
self.on_speech_detected: Optional[Callable] = None
self.on_partial_result: Optional[Callable] = None
self.on_final_result: Optional[Callable] = None
self.on_session_complete: Optional[Callable] = None
self.on_status_update: Optional[Callable] = None
logger.info(f"StreamingRecorder初始化完成 - 用户:{username}")
def _on_speech_start(self, vad_result: Dict[str, Any]):
"""VAD检测到语音开始"""
logger.info("检测到语音开始,创建新会话")
# 创建新的识别会话
self.current_session_id = str(uuid.uuid4())
self.recognition_manager.create_session(
self.current_session_id,
metadata={
'start_time': time.time(),
'speech_duration': vad_result['speech_duration'],
'volume': vad_result['volume']
}
)
# 通知外部
if self.on_speech_detected:
self.on_speech_detected('start', vad_result)
# 发送初始音频进行识别
self._send_audio_for_recognition(vad_result['audio_buffer'], 'partial')
def _on_speech_continue(self, vad_result: Dict[str, Any]):
"""VAD检测到语音继续"""
logger.debug(f"语音继续 - 时长:{vad_result['speech_duration']:.2f}s")
# 通知外部
if self.on_speech_detected:
self.on_speech_detected('continue', vad_result)
def _on_speech_end(self, vad_result: Dict[str, Any]):
"""VAD检测到语音结束"""
speech_duration = vad_result['speech_duration']
audio_buffer_size = len(vad_result['audio_buffer']) if vad_result['audio_buffer'] else 0
logger.info(f"检测到语音结束 - 时长:{speech_duration:.2f}s, 音频缓冲区大小:{audio_buffer_size}帧")
# 详细调试信息
if audio_buffer_size == 0:
logger.warning("语音结束但音频缓冲区为空,跳过处理")
return
if speech_duration < 0.1:
logger.warning(f"语音时长过短({speech_duration:.2f}s),可能是噪音")
# 发送最终音频进行识别
logger.debug(f"准备发送最终音频进行识别 - 会话ID:{self.current_session_id}")
self._send_audio_for_recognition(vad_result['audio_buffer'], 'final')
# 通知外部
if self.on_speech_detected:
self.on_speech_detected('end', vad_result)
def _on_partial_result_ready(self, vad_result: Dict[str, Any]):
"""VAD准备发送部分识别结果"""
logger.debug(f"准备发送部分识别结果 - 时长:{vad_result['speech_duration']:.2f}s")
# 发送部分音频进行识别
self._send_audio_for_recognition(vad_result['audio_buffer'], 'partial')
def _send_audio_for_recognition(self, audio_buffer: list, result_type: str):
"""发送音频数据进行识别(集成优化处理)"""
if not audio_buffer:
logger.warning(f"音频缓冲区为空,无法发送{result_type}识别请求")
return
if not self.current_session_id:
logger.warning(f"当前会话ID为空,无法发送{result_type}识别请求")
return
try:
# 将音频数据转换为字节格式
audio_bytes = b''.join(audio_buffer)
audio_size_bytes = len(audio_bytes)
audio_duration = audio_size_bytes / (self.rate * 2) # 16位音频,2字节每样本
logger.info(f"准备发送{result_type}音频识别 - 会话:{self.current_session_id}, 大小:{audio_size_bytes}字节, 时长:{audio_duration:.2f}s")
# 通过优化管理器处理音频
processing_context = {
'session_id': self.current_session_id,
'audio_data': audio_bytes,
'sample_rate': self.rate,
'timestamp': time.time(),
'result_type': result_type,
'metadata': {
'channels': self.channels,
'format': self.format,
'chunk_size': self.chunk,
'audio_duration': audio_duration,
'audio_size_bytes': audio_size_bytes
}
}
# 异步处理音频以避免阻塞
logger.debug(f"启动异步音频处理线程 - {result_type}")
threading.Thread(
target=self._process_audio_with_optimization,
args=(processing_context,),
daemon=True
).start()
except Exception as e:
logger.error(f"音频处理失败: {e}", exc_info=True)
def _process_audio_with_optimization(self, context: Dict[str, Any]):
"""通过优化管理器处理音频"""
session_id = context['session_id']
result_type = context['result_type']
audio_size = len(context['audio_data'])
logger.debug(f"开始优化音频处理 - 会话:{session_id}, 类型:{result_type}, 大小:{audio_size}字节")
try:
# 使用优化管理器处理音频(异步处理,结果通过回调返回)
logger.debug(f"调用优化管理器处理音频 - {session_id}")
success = self.optimization_manager.process_audio(
session_id=context['session_id'],
audio_data=context['audio_data'],
sample_rate=context['sample_rate']
)
logger.info(f"优化管理器处理提交结果: {success} - 会话:{session_id}")
# OptimizationManager.process_audio()是异步的,返回布尔值表示是否成功提交处理
# 实际结果会通过注册的回调函数返回
if not success:
# 如果提交处理失败,回退到原始处理方式
logger.warning(f"优化管理器处理提交失败,回退到原始处理 - 会话:{session_id}")
self._fallback_audio_processing(context)
else:
# 为了确保音频文件被保存用于调试,同时执行回退处理
logger.info(f"优化处理成功提交,同时执行回退处理以保存音频文件 - 会话:{session_id}")
self._fallback_audio_processing(context)
except Exception as e:
logger.error(f"优化音频处理失败: {e},回退到原始处理 - 会话:{session_id}", exc_info=True)
self._fallback_audio_processing(context)
def _fallback_audio_processing(self, context: Dict[str, Any]):
"""回退的音频处理方式"""
session_id = context['session_id']
result_type = context['result_type']
audio_size = len(context['audio_data'])
logger.info(f"开始回退音频处理 - 会话:{session_id}, 类型:{result_type}, 大小:{audio_size}字节")
try:
# 创建带时间戳的音频文件用于测试
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
temp_filename = os.path.join('cache_data', f'audio_{result_type}_{timestamp}.wav')
logger.debug(f"准备保存音频文件: {temp_filename}")
# 确保目录存在
cache_dir = 'cache_data'
if not os.path.exists(cache_dir):
logger.info(f"创建缓存目录: {cache_dir}")
os.makedirs(cache_dir, exist_ok=True)
else:
logger.debug(f"缓存目录已存在: {cache_dir}")
# 保存音频
logger.debug(f"开始写入音频文件: {temp_filename}")
with wave.open(temp_filename, 'wb') as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(pyaudio.get_sample_size(self.format))
wf.setframerate(self.rate)
wf.writeframes(context['audio_data'])
# 验证文件是否成功创建
if os.path.exists(temp_filename):
file_size = os.path.getsize(temp_filename)
logger.info(f"音频文件保存成功: {temp_filename}, 文件大小:{file_size}字节")
else:
logger.error(f"音频文件保存失败: {temp_filename}")
return
# 计算音频时长
audio_duration = len(context['audio_data']) / (self.rate * 2) # 16位音频,2字节每样本
# 生成请求ID
request_id = str(uuid.uuid4())
self.pending_asr_requests[request_id] = {
'session_id': session_id,
'result_type': result_type,
'audio_file': temp_filename,
'audio_duration': audio_duration,
'timestamp': time.time()
}
logger.info(f"发送音频进行{result_type}识别: {temp_filename}, 时长:{audio_duration:.2f}s, 请求ID:{request_id}")
# 异步发送给ASR
logger.debug(f"启动ASR处理线程 - 请求ID:{request_id}")
threading.Thread(
target=self._process_asr_request,
args=(request_id, temp_filename),
daemon=True
).start()
except Exception as e:
logger.error(f"回退音频处理失败 - 会话:{session_id}: {e}", exc_info=True)
def _on_optimization_result(self, result: Dict[str, Any]):
"""处理优化管理器的结果回调"""
try:
session_id = result.get('session_id')
text = result.get('text', '')
confidence = result.get('confidence', 0.0)
result_type = result.get('result_type', 'partial')
logger.info(f"优化处理结果 [{session_id}] ({result_type}): {text}")
# 通知外部回调
if result_type == 'partial' and self.on_partial_result:
self.on_partial_result(session_id, text, confidence)
elif result_type == 'final' and self.on_final_result:
self.on_final_result(session_id, text, confidence)
except Exception as e:
logger.error(f"处理优化结果失败: {e}")
def _on_optimization_error(self, session_id: str, error: Exception):
"""处理优化管理器的错误回调"""
try:
error_msg = str(error)
error_type = type(error).__name__
logger.error(f"优化处理错误 [{session_id}] ({error_type}): {error_msg}")
# 通知状态更新
if self.on_status_update:
self.on_status_update({
'type': 'optimization_error',
'session_id': session_id,
'error_type': error_type,
'message': error_msg
})
except Exception as e:
logger.error(f"处理优化错误失败: {e}")
def _on_optimization_metrics(self, metrics: Dict[str, Any]):
"""处理优化管理器的性能指标回调"""
try:
session_id = metrics.get('session_id', 'unknown')
latency = metrics.get('total_latency_ms', 0.0)
accuracy = metrics.get('accuracy_score', 0.0)
logger.debug(f"优化性能指标 [{session_id}]: 延迟={latency:.1f}ms, 精度={accuracy:.2f}")
# 通知状态更新
if self.on_status_update:
self.on_status_update({
'type': 'optimization_metrics',
'session_id': session_id,
'metrics': metrics
})
except Exception as e:
logger.error(f"处理优化指标失败: {e}")
def set_optimization_mode(self, mode: OptimizationMode):
"""设置优化模式"""
try:
self.optimization_manager.set_optimization_mode(mode)
logger.info(f"优化模式已设置为: {mode.value}")
# 通知状态更新
if self.on_status_update:
self.on_status_update({
'type': 'optimization_mode_changed',
'mode': mode.value
})
except Exception as e:
logger.error(f"设置优化模式失败: {e}")
def get_optimization_metrics(self) -> Dict[str, Any]:
"""获取优化性能指标"""
try:
return self.optimization_manager.get_performance_metrics()
except Exception as e:
logger.error(f"获取优化指标失败: {e}")
return {}
def _process_asr_request(self, request_id: str, audio_file: str):
"""处理ASR识别请求"""
logger.info(f"开始处理ASR请求 - 请求ID:{request_id}, 音频文件:{audio_file}")
try:
# 确保音频文件存在
import os
if not os.path.exists(audio_file):
logger.error(f"音频文件不存在: {audio_file}")
self._handle_asr_error(request_id, f"音频文件不存在: {audio_file}")
return
# 验证音频文件大小
file_size = os.path.getsize(audio_file)
logger.debug(f"音频文件验证通过 - 大小:{file_size}字节")
# 检查ASR客户端连接状态
if not hasattr(self.asr_client, 'is_connected') or not self.asr_client.is_connected():
logger.warning(f"ASR客户端未连接,尝试重新连接")
try:
self.asr_client.start()
logger.info(f"ASR客户端重新连接成功")
except Exception as conn_e:
logger.error(f"ASR客户端连接失败: {conn_e}")
self._handle_asr_error(request_id, f"ASR客户端连接失败: {conn_e}")
return
# 重置ASR客户端状态
self.asr_client.done = False
self.asr_client.finalResults = None
# 使用FunASR期望的URL格式发送音频文件
logger.info(f"发送音频文件到FunASR: {audio_file}")
self.asr_client.send_url(audio_file)
# 等待识别结果(这里需要根据实际ASR客户端实现调整)
# 由于FunASRSync是异步的,结果会通过回调返回
# 这里我们需要监听结果并关联到请求
# 模拟等待结果的过程
start_time = time.time()
timeout = 15.0 # 增加到15秒超时
check_interval = 0.2 # 增加检查间隔
logger.debug(f"开始等待ASR识别结果 - 超时:{timeout}s")
while time.time() - start_time < timeout:
elapsed = time.time() - start_time
# 检查ASR客户端状态
if hasattr(self.asr_client, 'done') and self.asr_client.done:
if hasattr(self.asr_client, 'finalResults') and self.asr_client.finalResults:
# 获取识别结果
result_text = self.asr_client.finalResults
logger.info(f"收到ASR识别结果 [{request_id}]: {result_text}")
# 处理识别结果
self._handle_asr_result(request_id, result_text)
return
else:
logger.debug(f"ASR处理完成但无识别结果 - 请求ID:{request_id}, 耗时:{elapsed:.2f}s")
# 定期输出等待状态
if int(elapsed) % 3 == 0 and elapsed > 0:
logger.debug(f"等待ASR识别结果中... 请求ID:{request_id}, 已等待:{elapsed:.1f}s")
time.sleep(check_interval)
else:
logger.warning(f"ASR识别超时 - 请求ID:{request_id}, 超时时间:{timeout}s")
self._handle_asr_timeout(request_id)
except Exception as e:
logger.error(f"处理ASR请求失败 [{request_id}]: {e}", exc_info=True)
self._handle_asr_error(request_id, str(e))
finally:
# 测试阶段保留临时文件用于分析
logger.debug(f"ASR请求处理完成 - 请求ID:{request_id}")
pass
def _handle_asr_result(self, request_id: str, result_text: str):
"""处理ASR识别结果"""
if request_id not in self.pending_asr_requests:
logger.warning(f"未找到ASR请求: {request_id}")
return
request_info = self.pending_asr_requests[request_id]
session_id = request_info['session_id']
result_type = request_info['result_type']
audio_duration = request_info['audio_duration']
logger.info(f"收到ASR结果 [{result_type}]: {result_text}")
# 添加到识别结果管理器
if result_type == 'partial':
self.recognition_manager.add_partial_result(
session_id, result_text, confidence=0.8, audio_duration=audio_duration
)
else: # final
self.recognition_manager.add_final_result(
session_id, result_text, confidence=0.9, audio_duration=audio_duration
)
# 如果是最终结果,完成会话
self.recognition_manager.complete_session(session_id)
self.current_session_id = None
# 清理请求
del self.pending_asr_requests[request_id]
def _handle_asr_timeout(self, request_id: str):
"""处理ASR超时"""
if request_id in self.pending_asr_requests:
logger.warning(f"ASR请求超时: {request_id}")
del self.pending_asr_requests[request_id]
def _handle_asr_error(self, request_id: str, error_msg: str):
"""处理ASR错误"""
if request_id in self.pending_asr_requests:
logger.error(f"ASR请求错误 [{request_id}]: {error_msg}")
del self.pending_asr_requests[request_id]
def _on_partial_recognition_result(self, session_id: str, result):
"""处理部分识别结果"""
logger.debug(f"部分识别结果 [{session_id}]: {result.text}")
if self.on_partial_result:
self.on_partial_result(session_id, result.text, result.confidence)
def _on_final_recognition_result(self, session_id: str, result):
"""处理最终识别结果"""
logger.info(f"最终识别结果 [{session_id}]: {result.text}")
if self.on_final_result:
self.on_final_result(session_id, result.text, result.confidence)
def _on_recognition_result_updated(self, session_id: str, merged_text: str, update_type: str):
"""处理识别结果更新"""
logger.debug(f"识别结果更新 [{session_id}] ({update_type}): {merged_text}")
if self.on_status_update:
self.on_status_update({
'type': 'result_update',
'session_id': session_id,
'text': merged_text,
'update_type': update_type
})
def _on_session_complete(self, session_id: str, final_text: str):
"""处理会话完成"""
logger.info(f"会话完成 [{session_id}]: {final_text}")
if self.on_session_complete:
self.on_session_complete(session_id, final_text)
def _apply_audio_gain(self, audio_data: bytes) -> bytes:
"""应用音频增益处理"""
if not self.enable_gain or self.volume_gain == 1.0:
return audio_data
try:
# 将字节数据转换为16位整数数组
count = len(audio_data) // 2
if count == 0:
return audio_data
format_str = f"{count}h"
shorts = struct.unpack(format_str, audio_data)
# 应用音量增益并防止溢出
amplified_shorts = []
for sample in shorts:
amplified_sample = int(sample * self.volume_gain)
# 限制在16位整数范围内
amplified_sample = max(-32768, min(32767, amplified_sample))
amplified_shorts.append(amplified_sample)
# 转换回字节数据
return struct.pack(format_str, *amplified_shorts)
except Exception as e:
logger.warning(f"音频增益处理失败: {e},使用原始音频")
return audio_data
def _recording_loop(self):
"""录音主循环"""
logger.info("开始流式录音循环")
while self.is_recording_flag:
try:
data = self.stream.read(self.chunk, exception_on_overflow=False)
# 应用音频增益处理
if self.enable_gain:
data = self._apply_audio_gain(data)
# 通过VAD处理音频帧
vad_result = self.vad.process_audio_frame(data)
# 更新状态
if self.on_status_update:
self.on_status_update({
'type': 'vad_status',
'is_speaking': vad_result['is_speaking'],
'volume': vad_result['volume'],
'threshold': vad_result['threshold'],
'speech_duration': vad_result['speech_duration'],
'silence_duration': vad_result['silence_duration']
})
except Exception as e:
logger.error(f"录音循环错误: {e}")
break
logger.info("流式录音循环结束")
def start_recording(self, device_index=None):
"""开始录音"""
if self.is_recording_flag:
logger.warning("录音已在进行中")
return False
try:
self.audio = pyaudio.PyAudio()
# 创建音频流
self.stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
input_device_index=device_index,
frames_per_buffer=self.chunk
)
# 重置状态
self.is_recording_flag = True
self.current_session_id = None
self.pending_asr_requests.clear()
# 重置VAD和识别管理器
self.vad.reset()
self.recognition_manager.reset()
# 启动ASR客户端
if not self.asr_client.is_connected():
self.asr_client.start()
# 启动录音线程
self.recording_thread = threading.Thread(target=self._recording_loop)
self.recording_thread.start()
logger.info("流式录音开始")
return True
except Exception as e:
logger.error(f"启动流式录音失败: {e}")
self.stop_recording()
return False
def stop_recording(self):
"""停止录音"""
logger.info("停止流式录音")
self.is_recording_flag = False
# 强制结束当前语音片段
if self.current_session_id:
final_result = self.vad.force_end_speech()
if final_result:
self._send_audio_for_recognition(final_result['audio_buffer'], 'final')
if self.recording_thread:
self.recording_thread.join(timeout=2)
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
if self.audio:
self.audio.terminate()
self.audio = None
# 结束ASR客户端
self.asr_client.end()
logger.info("流式录音已停止")
def is_recording(self):
"""检查是否正在录音"""
return self.is_recording_flag
def get_status(self):
"""获取录音器状态"""
vad_status = self.vad.get_status()
recognition_status = self.recognition_manager.get_status()
return {
'is_recording': self.is_recording_flag,
'current_session_id': self.current_session_id,
'pending_asr_requests': len(self.pending_asr_requests),
'asr_connected': self.asr_client.is_connected(),
'vad_status': vad_status,
'recognition_status': recognition_status
}
def get_current_result(self):
"""获取当前识别结果"""
if self.current_session_id:
return self.recognition_manager.get_merged_result(self.current_session_id)
return ""
def list_audio_devices(self):
"""列出可用的音频设备"""
audio = pyaudio.PyAudio()
devices = []
for i in range(audio.get_device_count()):
device_info = audio.get_device_info_by_index(i)
if device_info['maxInputChannels'] > 0:
devices.append({
'index': i,
'name': device_info['name'],
'channels': device_info['maxInputChannels'],
'sample_rate': device_info['defaultSampleRate']
})
audio.terminate()
return devices
if __name__ == "__main__":
# 测试代码
def on_partial_result(session_id, text, confidence):
print(f"[部分结果] {text} (置信度: {confidence:.2f})")
def on_final_result(session_id, text, confidence):
print(f"[最终结果] {text} (置信度: {confidence:.2f})")
def on_session_complete(session_id, final_text):
print(f"[会话完成] {final_text}")
def on_status_update(status):
if status['type'] == 'vad_status' and status['is_speaking']:
print(f"[VAD] 语音中 - 音量:{status['volume']:.3f}, 时长:{status['speech_duration']:.1f}s")
recorder = StreamingRecorder(
volume_threshold=0.03,
silence_duration=1.5,
min_speech_duration=0.5,
max_speech_duration=30.0,
partial_result_interval=2.0
)
# 设置回调
recorder.on_partial_result = on_partial_result
recorder.on_final_result = on_final_result
recorder.on_session_complete = on_session_complete
recorder.on_status_update = on_status_update
print("可用音频设备:")
devices = recorder.list_audio_devices()
for device in devices:
print(f" {device['index']}: {device['name']}")
print("\n按Enter开始流式录音...")
input()
recorder.start_recording()
print("流式录音中... 按Enter停止")
input()
recorder.stop_recording()
print("流式录音结束")