util.py 6.68 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AIfeng/2025-01-02 10:27:06
通用工具模块
提供日志、时间、文件操作等通用功能
"""

import os
import sys
import time
import logging
from datetime import datetime
from typing import Any, Optional

# 先确保日志目录存在
def _ensure_logs_dir():
    """确保日志目录存在"""
    logs_dir = 'logs'
    if not os.path.exists(logs_dir):
        os.makedirs(logs_dir, exist_ok=True)

# 初始化日志目录
_ensure_logs_dir()

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('logs/system.log', encoding='utf-8')
    ]
)

logger = logging.getLogger(__name__)

# 日志级别映射
LOG_LEVELS = {
    1: logging.INFO,
    2: logging.WARNING, 
    3: logging.ERROR,
    4: logging.CRITICAL
}

def log(level: int, message: str, username: str = "System"):
    """
    统一日志输出函数
    
    Args:
        level: 日志级别 (1=INFO, 2=WARNING, 3=ERROR, 4=CRITICAL)
        message: 日志消息
        username: 用户名
    """
    log_level = LOG_LEVELS.get(level, logging.INFO)
    formatted_message = f"[{username}] {message}"
    
    logger.log(log_level, formatted_message)
    
    # 同时输出到控制台(带颜色)
    timestamp = datetime.now().strftime('%H:%M:%S')
    
    if level == 1:  # INFO - 蓝色
        print(f"\033[94m[{timestamp}] [INFO] [{username}] {message}\033[0m")
    elif level == 2:  # WARNING - 黄色
        print(f"\033[93m[{timestamp}] [WARN] [{username}] {message}\033[0m")
    elif level == 3:  # ERROR - 红色
        print(f"\033[91m[{timestamp}] [ERROR] [{username}] {message}\033[0m")
    elif level == 4:  # CRITICAL - 紫色
        print(f"\033[95m[{timestamp}] [CRITICAL] [{username}] {message}\033[0m")
    else:
        print(f"[{timestamp}] [UNKNOWN] [{username}] {message}")

def printInfo(level: int, username: str, message: str):
    """
    兼容性函数,与原项目保持一致
    
    Args:
        level: 日志级别
        username: 用户名
        message: 消息内容
    """
    log(level, message, username)

def get_current_time() -> str:
    """
    获取当前时间字符串
    
    Returns:
        格式化的时间字符串
    """
    return datetime.now().strftime('%Y-%m-%d %H:%M:%S')

def get_timestamp() -> float:
    """
    获取当前时间戳
    
    Returns:
        时间戳
    """
    return time.time()

def ensure_dir(directory: str) -> bool:
    """
    确保目录存在,如果不存在则创建
    
    Args:
        directory: 目录路径
        
    Returns:
        是否成功
    """
    try:
        if not os.path.exists(directory):
            os.makedirs(directory, exist_ok=True)
            log(1, f"创建目录: {directory}")
        return True
    except Exception as e:
        log(3, f"创建目录失败 {directory}: {e}")
        return False

def safe_filename(filename: str) -> str:
    """
    生成安全的文件名
    
    Args:
        filename: 原始文件名
        
    Returns:
        安全的文件名
    """
    # 移除或替换不安全的字符
    unsafe_chars = '<>:"/\\|?*'
    safe_name = filename
    
    for char in unsafe_chars:
        safe_name = safe_name.replace(char, '_')
        
    return safe_name

def format_duration(seconds: float) -> str:
    """
    格式化持续时间
    
    Args:
        seconds: 秒数
        
    Returns:
        格式化的时间字符串
    """
    if seconds < 60:
        return f"{seconds:.1f}秒"
    elif seconds < 3600:
        minutes = seconds / 60
        return f"{minutes:.1f}分钟"
    else:
        hours = seconds / 3600
        return f"{hours:.1f}小时"

def format_file_size(size_bytes: int) -> str:
    """
    格式化文件大小
    
    Args:
        size_bytes: 字节数
        
    Returns:
        格式化的大小字符串
    """
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 * 1024:
        return f"{size_bytes / 1024:.1f} KB"
    elif size_bytes < 1024 * 1024 * 1024:
        return f"{size_bytes / (1024 * 1024):.1f} MB"
    else:
        return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"

def is_valid_audio_file(filepath: str) -> bool:
    """
    检查是否为有效的音频文件
    
    Args:
        filepath: 文件路径
        
    Returns:
        是否为有效音频文件
    """
    if not os.path.exists(filepath):
        return False
        
    valid_extensions = ['.wav', '.mp3', '.flac', '.aac', '.ogg', '.m4a']
    file_ext = os.path.splitext(filepath)[1].lower()
    
    return file_ext in valid_extensions

def cleanup_temp_files(temp_dir: str, max_age_hours: int = 24):
    """
    清理临时文件
    
    Args:
        temp_dir: 临时目录
        max_age_hours: 最大保留时间(小时)
    """
    try:
        if not os.path.exists(temp_dir):
            return
            
        current_time = time.time()
        max_age_seconds = max_age_hours * 3600
        
        for filename in os.listdir(temp_dir):
            filepath = os.path.join(temp_dir, filename)
            
            if os.path.isfile(filepath):
                file_age = current_time - os.path.getmtime(filepath)
                
                if file_age > max_age_seconds:
                    try:
                        os.remove(filepath)
                        log(1, f"清理临时文件: {filename}")
                    except Exception as e:
                        log(2, f"清理文件失败 {filename}: {e}")
                        
    except Exception as e:
        log(3, f"清理临时文件时出错: {e}")

def get_system_info() -> dict:
    """
    获取系统信息
    
    Returns:
        系统信息字典
    """
    import platform
    import psutil
    
    try:
        return {
            'platform': platform.platform(),
            'python_version': platform.python_version(),
            'cpu_count': psutil.cpu_count(),
            'memory_total': psutil.virtual_memory().total,
            'memory_available': psutil.virtual_memory().available,
            'disk_usage': psutil.disk_usage('/').percent if os.name != 'nt' else psutil.disk_usage('C:').percent
        }
    except Exception as e:
        log(2, f"获取系统信息时出错: {e}")
        return {
            'platform': platform.platform(),
            'python_version': platform.python_version(),
            'error': str(e)
        }

# 初始化日志目录
ensure_dir('logs')
ensure_dir('cache_data')
ensure_dir('temp')

# 启动时记录系统信息
log(1, "工具模块已加载")
system_info = get_system_info()
log(1, f"系统信息: {system_info}")