test_cache_preservation.py 7.55 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 15:35:00
缓存文件保留测试脚本
用于测试语音识别差异问题,保留缓存文件以便分析
"""

import os
import sys
import time
import subprocess
import signal
from datetime import datetime
from pathlib import Path

class CachePreservationTester:
    def __init__(self):
        self.project_root = Path.cwd()
        self.cache_dir = self.project_root / "cache_data"
        self.original_asr_server = self.project_root / "web" / "asr" / "funasr" / "ASR_server.py"
        self.no_cleanup_asr_server = self.project_root / "web" / "asr" / "funasr" / "ASR_server_no_cleanup.py"
        self.backup_asr_server = self.project_root / "web" / "asr" / "funasr" / "ASR_server_backup.py"
        
    def print_header(self):
        """打印测试头部信息"""
        print("=" * 80)
        print("🔧 缓存文件保留测试工具")
        print(f"⏰ 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"📁 项目目录: {self.project_root}")
        print(f"💾 缓存目录: {self.cache_dir}")
        print("=" * 80)
        
    def check_files(self):
        """检查必要文件是否存在"""
        print("\n📋 检查文件状态:")
        
        files_to_check = [
            ("原始ASR服务器", self.original_asr_server),
            ("无清理ASR服务器", self.no_cleanup_asr_server),
            ("缓存目录", self.cache_dir)
        ]
        
        all_ok = True
        for name, path in files_to_check:
            if path.exists():
                print(f"✅ {name}: {path}")
            else:
                print(f"❌ {name}: {path} (不存在)")
                all_ok = False
                
        return all_ok
        
    def backup_original_server(self):
        """备份原始ASR服务器"""
        if not self.backup_asr_server.exists():
            try:
                import shutil
                shutil.copy2(self.original_asr_server, self.backup_asr_server)
                print(f"✅ 已备份原始ASR服务器到: {self.backup_asr_server}")
                return True
            except Exception as e:
                print(f"❌ 备份失败: {e}")
                return False
        else:
            print(f"ℹ️ 备份文件已存在: {self.backup_asr_server}")
            return True
            
    def switch_to_no_cleanup_server(self):
        """切换到无清理版本的ASR服务器"""
        try:
            import shutil
            shutil.copy2(self.no_cleanup_asr_server, self.original_asr_server)
            print(f"✅ 已切换到无清理版本的ASR服务器")
            return True
        except Exception as e:
            print(f"❌ 切换失败: {e}")
            return False
            
    def restore_original_server(self):
        """恢复原始ASR服务器"""
        if self.backup_asr_server.exists():
            try:
                import shutil
                shutil.copy2(self.backup_asr_server, self.original_asr_server)
                print(f"✅ 已恢复原始ASR服务器")
                return True
            except Exception as e:
                print(f"❌ 恢复失败: {e}")
                return False
        else:
            print(f"❌ 备份文件不存在: {self.backup_asr_server}")
            return False
            
    def clear_cache_dir(self):
        """清空缓存目录"""
        try:
            if self.cache_dir.exists():
                for file in self.cache_dir.glob("*.wav"):
                    file.unlink()
                print(f"✅ 已清空缓存目录")
            else:
                self.cache_dir.mkdir(exist_ok=True)
                print(f"✅ 已创建缓存目录")
            return True
        except Exception as e:
            print(f"❌ 清空缓存目录失败: {e}")
            return False
            
    def count_cache_files(self):
        """统计缓存文件数量"""
        if not self.cache_dir.exists():
            return 0
        return len(list(self.cache_dir.glob("*.wav")))
        
    def list_cache_files(self):
        """列出缓存文件"""
        if not self.cache_dir.exists():
            print("📭 缓存目录不存在")
            return
            
        wav_files = list(self.cache_dir.glob("*.wav"))
        if not wav_files:
            print("📭 缓存目录为空")
            return
            
        print(f"\n📁 缓存文件列表 ({len(wav_files)} 个文件):")
        for i, file in enumerate(wav_files, 1):
            stat = file.stat()
            size_kb = stat.st_size / 1024
            mtime = datetime.fromtimestamp(stat.st_mtime)
            print(f"  {i:2d}. {file.name}")
            print(f"      📊 大小: {size_kb:.1f} KB")
            print(f"      ⏰ 修改: {mtime.strftime('%H:%M:%S')}")
            
    def show_usage_instructions(self):
        """显示使用说明"""
        print("\n📖 使用说明:")
        print("1️⃣ 运行此脚本切换到无清理版本的ASR服务器")
        print("2️⃣ 启动录音服务: python server_recording_api_sync.py")
        print("3️⃣ 在浏览器中访问: http://localhost:5000/voice_test_sync.html")
        print("4️⃣ 进行录音测试,观察缓存文件保留情况")
        print("5️⃣ 分析保留的音频文件与识别结果的差异")
        print("6️⃣ 测试完成后运行此脚本恢复原始服务器")
        
    def interactive_menu(self):
        """交互式菜单"""
        while True:
            print("\n🔧 操作菜单:")
            print("1. 切换到无清理版本ASR服务器")
            print("2. 恢复原始ASR服务器")
            print("3. 清空缓存目录")
            print("4. 查看缓存文件")
            print("5. 显示使用说明")
            print("6. 退出")
            
            choice = input("\n请选择操作 (1-6): ").strip()
            
            if choice == '1':
                if self.backup_original_server():
                    self.switch_to_no_cleanup_server()
                    print("\n✅ 切换完成!现在可以启动录音服务进行测试")
                    print("   启动命令: python server_recording_api_sync.py")
                    
            elif choice == '2':
                self.restore_original_server()
                print("\n✅ 恢复完成!ASR服务器已恢复到原始版本")
                
            elif choice == '3':
                self.clear_cache_dir()
                
            elif choice == '4':
                self.list_cache_files()
                
            elif choice == '5':
                self.show_usage_instructions()
                
            elif choice == '6':
                print("\n👋 退出测试工具")
                break
                
            else:
                print("\n❌ 无效选择,请重新输入")
                
    def run(self):
        """运行测试工具"""
        self.print_header()
        
        if not self.check_files():
            print("\n❌ 文件检查失败,请确保项目结构完整")
            return False
            
        cache_count = self.count_cache_files()
        print(f"\n📊 当前缓存文件数量: {cache_count}")
        
        self.show_usage_instructions()
        self.interactive_menu()
        
        return True

def main():
    """主函数"""
    tester = CachePreservationTester()
    try:
        tester.run()
    except KeyboardInterrupt:
        print("\n\n⚠️ 用户中断操作")
    except Exception as e:
        print(f"\n❌ 运行出错: {e}")
        
if __name__ == "__main__":
    main()