test_scheduler.py 6.56 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-07-02 14:01:37
Scheduler模块测试脚本
"""

import time
import sys
import os

# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

def test_scheduler_import():
    """测试scheduler模块导入"""
    print("=== Scheduler模块导入测试 ===")
    
    try:
        from scheduler.thread_manager import MyThread, ThreadManager
        print("✓ MyThread 导入成功")
        print("✓ ThreadManager 导入成功")
        return True
    except ImportError as e:
        print(f"✗ 导入失败: {e}")
        return False
    except Exception as e:
        print(f"✗ 未知错误: {e}")
        return False

def test_mythread_basic():
    """测试MyThread基础功能"""
    print("\n=== MyThread基础功能测试 ===")
    
    try:
        from scheduler.thread_manager import MyThread
        
        def test_function(name, delay=1):
            print(f"  线程 {name} 开始执行")
            time.sleep(delay)
            print(f"  线程 {name} 执行完成")
            return f"结果来自 {name}"
        
        # 创建线程
        thread = MyThread(target=test_function, name="test_thread", 
                         args=("TestWorker", 0.5))
        print("✓ MyThread 创建成功")
        
        # 启动线程
        thread.start()
        print("✓ 线程启动成功")
        
        # 等待完成
        thread.join()
        print("✓ 线程执行完成")
        
        # 获取结果
        result = thread.get_result()
        print(f"✓ 获取结果: {result}")
        
        # 获取运行时长
        duration = thread.get_duration()
        print(f"✓ 运行时长: {duration:.2f}秒")
        
        return True
        
    except Exception as e:
        print(f"✗ MyThread测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False

def test_mythread_exception():
    """测试MyThread异常处理"""
    print("\n=== MyThread异常处理测试 ===")
    
    try:
        from scheduler.thread_manager import MyThread
        
        def error_function():
            raise ValueError("这是一个测试异常")
        
        # 创建会出错的线程
        thread = MyThread(target=error_function, name="error_thread")
        thread.start()
        thread.join()
        
        # 检查异常
        exception = thread.get_exception()
        if exception:
            print(f"✓ 异常捕获成功: {exception}")
            return True
        else:
            print("✗ 未捕获到预期异常")
            return False
            
    except Exception as e:
        print(f"✗ 异常处理测试失败: {e}")
        return False

def test_thread_manager():
    """测试ThreadManager功能"""
    print("\n=== ThreadManager功能测试 ===")
    
    try:
        from scheduler.thread_manager import ThreadManager
        
        def worker_function(worker_id, duration=0.5):
            print(f"  Worker {worker_id} 开始工作")
            time.sleep(duration)
            print(f"  Worker {worker_id} 完成工作")
            return f"Worker {worker_id} 的结果"
        
        # 创建线程管理器
        manager = ThreadManager()
        print("✓ ThreadManager 创建成功")
        
        # 创建多个线程
        for i in range(3):
            thread = manager.create_thread(
                name=f"worker_{i}",
                target=worker_function,
                args=(i, 0.3)
            )
            print(f"✓ 创建线程 worker_{i}")
        
        # 启动所有线程
        for i in range(3):
            success = manager.start_thread(f"worker_{i}")
            if success:
                print(f"✓ 启动线程 worker_{i}")
            else:
                print(f"✗ 启动线程 worker_{i} 失败")
        
        # 等待一段时间
        time.sleep(1)
        
        # 获取状态
        status = manager.get_thread_status()
        print(f"✓ 线程状态: {len(status)} 个线程")
        for name, info in status.items():
            print(f"  {name}: alive={info['alive']}, duration={info['duration']:.2f}s")
        
        # 停止所有线程
        success = manager.stop_all_threads()
        print(f"✓ 停止所有线程: {'成功' if success else '部分失败'}")
        
        # 清理
        manager.cleanup_finished_threads()
        print("✓ 清理完成的线程")
        
        return True
        
    except Exception as e:
        print(f"✗ ThreadManager测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False

def test_project_integration():
    """测试项目集成"""
    print("\n=== 项目集成测试 ===")
    
    try:
        # 测试recorder_sync.py中的导入
        print("测试 recorder_sync.py 导入...")
        from recorder_sync import RecorderSync
        print("✓ recorder_sync.py 导入成功")
        
        # 测试scheduler模块在recorder_sync中的可用性
        print("测试 scheduler 模块在项目中的可用性...")
        import scheduler.thread_manager
        print("✓ scheduler 模块在项目中可用")
        
        return True
        
    except Exception as e:
        print(f"✗ 项目集成测试失败: {e}")
        import traceback
        traceback.print_exc()
        return False

def main():
    """主测试函数"""
    print("🔍 Scheduler模块测试")
    print("=" * 50)
    
    tests = [
        ("模块导入", test_scheduler_import),
        ("MyThread基础功能", test_mythread_basic),
        ("MyThread异常处理", test_mythread_exception),
        ("ThreadManager功能", test_thread_manager),
        ("项目集成", test_project_integration)
    ]
    
    passed = 0
    total = len(tests)
    
    for test_name, test_func in tests:
        print(f"\n🧪 运行测试: {test_name}")
        try:
            if test_func():
                passed += 1
                print(f"✅ {test_name} 测试通过")
            else:
                print(f"❌ {test_name} 测试失败")
        except Exception as e:
            print(f"❌ {test_name} 测试异常: {e}")
    
    print("\n" + "=" * 50)
    print(f"📊 测试结果: {passed}/{total} 通过")
    
    if passed == total:
        print("🎉 所有测试通过!Scheduler模块工作正常。")
        print("\n可以继续运行项目:")
        print("  python main_sync.py")
        return 0
    else:
        print("❌ 部分测试失败,请检查错误信息。")
        return 1

if __name__ == "__main__":
    sys.exit(main())