monitor_recording.py 7.62 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 10:27:06
录音过程监控脚本
实时监控录音缓存文件和识别结果
"""

import os
import sys
import time
import json
import wave
from datetime import datetime
from pathlib import Path

def monitor_cache_files():
    """监控缓存文件变化"""
    cache_dir = "cache_data"
    if not os.path.exists(cache_dir):
        os.makedirs(cache_dir, exist_ok=True)
    
    print(f"📁 监控目录: {os.path.abspath(cache_dir)}")
    print(f"⏰ 开始时间: {datetime.now().strftime('%H:%M:%S')}")
    print("=" * 60)
    
    last_files = set()
    file_info = {}  # 存储文件信息
    
    while True:
        try:
            current_files = set(os.listdir(cache_dir)) if os.path.exists(cache_dir) else set()
            
            # 检查新增文件
            new_files = current_files - last_files
            for file in new_files:
                file_path = os.path.join(cache_dir, file)
                if os.path.isfile(file_path):
                    stat = os.stat(file_path)
                    file_info[file] = {
                        'created': datetime.fromtimestamp(stat.st_ctime),
                        'size': stat.st_size,
                        'path': file_path
                    }
                    
                    print(f"🎵 新音频文件: {file}")
                    print(f"   📊 大小: {stat.st_size:,} bytes")
                    print(f"   ⏰ 创建: {datetime.fromtimestamp(stat.st_ctime).strftime('%H:%M:%S.%f')[:-3]}")
                    
                    # 分析WAV文件
                    try:
                        with wave.open(file_path, 'rb') as wf:
                            duration = wf.getnframes() / wf.getframerate()
                            print(f"   🎼 时长: {duration:.2f}秒, 采样率: {wf.getframerate()}Hz")
                    except Exception as e:
                        print(f"   ❌ WAV分析失败: {e}")
                    
                    print()
            
            # 检查删除的文件
            deleted_files = last_files - current_files
            for file in deleted_files:
                if file in file_info:
                    info = file_info[file]
                    existed_time = (datetime.now() - info['created']).total_seconds()
                    print(f"🗑️ 文件删除: {file}")
                    print(f"   ⏱️ 存在时长: {existed_time:.2f}秒")
                    print(f"   📊 文件大小: {info['size']:,} bytes")
                    print()
                    del file_info[file]
            
            last_files = current_files
            
            # 显示当前状态
            if len(current_files) > 0:
                print(f"\r📂 当前文件数: {len(current_files)} | 时间: {datetime.now().strftime('%H:%M:%S')}", end="", flush=True)
            else:
                print(f"\r⭕ 无缓存文件 | 时间: {datetime.now().strftime('%H:%M:%S')}", end="", flush=True)
            
            time.sleep(0.5)
            
        except KeyboardInterrupt:
            print("\n\n🛑 用户中断监控")
            break
        except Exception as e:
            print(f"\n❌ 监控出错: {e}")
            time.sleep(1)

def monitor_recognition_results():
    """监控识别结果日志"""
    log_file = "recognition_results.log"
    
    print(f"📝 监控识别结果: {os.path.abspath(log_file)}")
    
    if not os.path.exists(log_file):
        # 创建空日志文件
        with open(log_file, 'w', encoding='utf-8') as f:
            f.write(f"# 识别结果日志 - {datetime.now()}\n")
    
    # 获取文件初始大小
    last_size = os.path.getsize(log_file)
    
    while True:
        try:
            current_size = os.path.getsize(log_file)
            
            if current_size > last_size:
                # 文件有新内容
                with open(log_file, 'r', encoding='utf-8') as f:
                    f.seek(last_size)
                    new_content = f.read()
                    
                if new_content.strip():
                    lines = new_content.strip().split('\n')
                    for line in lines:
                        if line.strip() and not line.startswith('#'):
                            print(f"\n🎯 识别结果: {line}")
                
                last_size = current_size
            
            time.sleep(0.5)
            
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f"\n❌ 日志监控出错: {e}")
            time.sleep(1)

def analyze_existing_files():
    """分析现有的缓存文件"""
    cache_dir = "cache_data"
    
    print("🔍 分析现有缓存文件")
    print("=" * 40)
    
    if not os.path.exists(cache_dir):
        print("❌ 缓存目录不存在")
        return
    
    files = [f for f in os.listdir(cache_dir) if f.endswith('.wav')]
    
    if not files:
        print("📭 缓存目录为空")
        return
    
    print(f"📁 找到 {len(files)} 个音频文件:")
    
    for file in sorted(files):
        file_path = os.path.join(cache_dir, file)
        stat = os.stat(file_path)
        
        print(f"\n🎵 {file}")
        print(f"   📊 大小: {stat.st_size:,} bytes")
        print(f"   📅 创建: {datetime.fromtimestamp(stat.st_ctime)}")
        print(f"   📝 修改: {datetime.fromtimestamp(stat.st_mtime)}")
        
        # 分析WAV文件
        try:
            with wave.open(file_path, 'rb') as wf:
                channels = wf.getnchannels()
                sample_width = wf.getsampwidth()
                framerate = wf.getframerate()
                frames = wf.getnframes()
                duration = frames / framerate
                
                print(f"   🎼 格式: {channels}ch, {sample_width*8}bit, {framerate}Hz")
                print(f"   ⏱️ 时长: {duration:.2f}秒 ({frames:,} 帧)")
                
                # 计算文件年龄
                age = (datetime.now() - datetime.fromtimestamp(stat.st_mtime)).total_seconds()
                print(f"   🕐 文件年龄: {age:.1f}秒")
                
        except Exception as e:
            print(f"   ❌ WAV分析失败: {e}")
    
    print()

def show_system_info():
    """显示系统信息"""
    print("💻 系统信息")
    print("=" * 30)
    print(f"🐍 Python: {sys.version.split()[0]}")
    print(f"📂 工作目录: {os.getcwd()}")
    print(f"⏰ 当前时间: {datetime.now()}")
    
    # 检查关键目录
    dirs_to_check = ['cache_data', 'logs', 'audio', 'temp']
    print(f"\n📁 目录状态:")
    for dir_name in dirs_to_check:
        exists = os.path.exists(dir_name)
        if exists:
            file_count = len([f for f in os.listdir(dir_name) if os.path.isfile(os.path.join(dir_name, f))])
            print(f"   ✅ {dir_name}: 存在 ({file_count} 个文件)")
        else:
            print(f"   ❌ {dir_name}: 不存在")
    
    print()

def main():
    """主函数"""
    print("🎙️ 录音过程监控工具")
    print("=" * 50)
    
    show_system_info()
    analyze_existing_files()
    
    print("🚀 开始实时监控...")
    print("💡 提示: 在浏览器中访问 http://localhost:5050/voice_test_sync.html 进行录音测试")
    print("⌨️ 按 Ctrl+C 停止监控")
    print("=" * 50)
    
    try:
        # 启动监控
        monitor_cache_files()
    except KeyboardInterrupt:
        print("\n\n👋 监控已停止")
    except Exception as e:
        print(f"\n❌ 监控出错: {e}")
    
    print("\n📊 最终分析:")
    analyze_existing_files()
    
    print("\n✅ 监控完成")

if __name__ == "__main__":
    main()