monitor.py 12.5 KB
"""
日志监控器 - 实时监控三个log文件中的SummaryNode和ReportFormattingNode输出
"""

import os
import time
import threading
from pathlib import Path
from datetime import datetime
import re
from typing import Dict, Optional, List
from threading import Lock

class LogMonitor:
    """基于文件变化的智能日志监控器"""
    
    def __init__(self, log_dir: str = "logs"):
        """初始化日志监控器"""
        self.log_dir = Path(log_dir)
        self.forum_log_file = self.log_dir / "forum.log"
        
        # 要监控的日志文件
        self.monitored_logs = {
            'insight': self.log_dir / 'insight.log',
            'media': self.log_dir / 'media.log', 
            'query': self.log_dir / 'query.log'
        }
        
        # 监控状态
        self.is_monitoring = False
        self.monitor_thread = None
        self.file_positions = {}  # 记录每个文件的读取位置
        self.file_line_counts = {}  # 记录每个文件的行数
        self.is_searching = False  # 是否正在搜索
        self.search_inactive_count = 0  # 搜索非活跃计数器
        self.write_lock = Lock()  # 写入锁,防止并发写入冲突
        
        # 目标节点名称 - 直接匹配字符串
        self.target_nodes = [
            'FirstSummaryNode',
            'ReflectionSummaryNode', 
            'ReportFormattingNode'
        ]
        
        # 确保logs目录存在
        self.log_dir.mkdir(exist_ok=True)
    
    def clear_forum_log(self):
        """清空forum.log文件"""
        try:
            if self.forum_log_file.exists():
                self.forum_log_file.unlink()
            
            # 创建新的forum.log文件并写入开始标记
            with open(self.forum_log_file, 'w', encoding='utf-8') as f:
                start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                f.write(f"=== ForumEgine 监控开始 - {start_time} ===\n")
                
            print(f"ForumEgine: forum.log 已清空并初始化")
            
        except Exception as e:
            print(f"ForumEgine: 清空forum.log失败: {e}")
    
    def write_to_forum_log(self, content: str):
        """写入内容到forum.log(线程安全)"""
        try:
            with self.write_lock:  # 使用锁确保线程安全
                with open(self.forum_log_file, 'a', encoding='utf-8') as f:
                    timestamp = datetime.now().strftime('%H:%M:%S')
                    f.write(f"[{timestamp}] {content}\n")
                    f.flush()
        except Exception as e:
            print(f"ForumEgine: 写入forum.log失败: {e}")
    
    def is_target_log_line(self, line: str) -> bool:
        """检查是否是目标日志行(SummaryNode或ReportFormattingNode)"""
        # 简单字符串包含检查,更可靠
        for node_name in self.target_nodes:
            if node_name in line:
                return True
        return False
    
    def extract_node_content(self, line: str) -> Optional[str]:
        """提取节点内容"""
        # 移除时间戳部分,保留节点名称和消息
        # 格式: [HH:MM:SS] [NodeName] message
        match = re.search(r'\[\d{2}:\d{2}:\d{2}\]\s*(.+)', line)
        if match:
            return match.group(1).strip()
        return line.strip()
    
    def get_file_size(self, file_path: Path) -> int:
        """获取文件大小"""
        try:
            return file_path.stat().st_size if file_path.exists() else 0
        except:
            return 0
    
    def get_file_line_count(self, file_path: Path) -> int:
        """获取文件行数"""
        try:
            if not file_path.exists():
                return 0
            with open(file_path, 'r', encoding='utf-8') as f:
                return sum(1 for _ in f)
        except:
            return 0
    
    # 移除这个方法,逻辑已经合并到monitor_logs中
    
    def read_new_lines(self, file_path: Path, app_name: str) -> List[str]:
        """读取文件中的新行"""
        new_lines = []
        
        try:
            if not file_path.exists():
                return new_lines
            
            current_size = self.get_file_size(file_path)
            last_position = self.file_positions.get(app_name, 0)
            
            # 如果文件变小了,说明被清空了,重新从头开始
            if current_size < last_position:
                last_position = 0
            
            if current_size > last_position:
                with open(file_path, 'r', encoding='utf-8') as f:
                    f.seek(last_position)
                    new_content = f.read()
                    new_lines = new_content.split('\n')
                    
                    # 更新位置
                    self.file_positions[app_name] = f.tell()
                    
                    # 过滤空行
                    new_lines = [line.strip() for line in new_lines if line.strip()]
                    
        except Exception as e:
            print(f"ForumEgine: 读取{app_name}日志失败: {e}")
        
        return new_lines
    
    def monitor_logs(self):
        """智能监控日志文件"""
        print("ForumEgine: 开始智能监控日志文件...")
        
        # 初始化文件行数和位置 - 记录当前状态作为基线
        for app_name, log_file in self.monitored_logs.items():
            self.file_line_counts[app_name] = self.get_file_line_count(log_file)
            self.file_positions[app_name] = self.get_file_size(log_file)
            print(f"ForumEgine: {app_name} 基线行数: {self.file_line_counts[app_name]}")
        
        while self.is_monitoring:
            try:
                # 同时检测三个log文件的变化
                any_growth = False
                any_shrink = False
                captured_any = False
                
                # 为每个log文件独立处理
                for app_name, log_file in self.monitored_logs.items():
                    current_lines = self.get_file_line_count(log_file)
                    previous_lines = self.file_line_counts.get(app_name, 0)
                    
                    if current_lines > previous_lines:
                        any_growth = True
                        # 立即读取新增内容
                        new_lines = self.read_new_lines(log_file, app_name)
                        
                        # 先检查是否需要触发搜索(只触发一次)
                        if not self.is_searching:
                            for line in new_lines:
                                if line.strip() and 'FirstSummaryNode' in line:
                                    print(f"ForumEgine: 在{app_name}中检测到FirstSummaryNode,开始监控记录")
                                    self.is_searching = True
                                    self.search_inactive_count = 0
                                    # 清空forum.log开始新会话
                                    self.clear_forum_log()
                                    break  # 找到一个就够了,跳出循环
                        
                        # 处理所有新增内容(如果正在搜索状态)
                        if self.is_searching:
                            for line in new_lines:
                                if line.strip() and self.is_target_log_line(line):
                                    # 立即记录目标节点输出
                                    formatted_content = f"[{app_name.upper()}] {line.strip()}"
                                    self.write_to_forum_log(formatted_content)
                                    print(f"ForumEgine: 捕获 - {formatted_content}")
                                    captured_any = True
                    
                    elif current_lines < previous_lines:
                        any_shrink = True
                        print(f"ForumEgine: 检测到 {app_name} 日志缩短,将重置基线")
                        # 重置文件位置到新的文件末尾
                        self.file_positions[app_name] = self.get_file_size(log_file)
                    
                    # 更新行数记录
                    self.file_line_counts[app_name] = current_lines
                
                # 检查是否应该结束当前搜索会话
                if self.is_searching:
                    if any_shrink:
                        # log变短,结束当前搜索会话,重置为等待状态
                        print("ForumEgine: 日志缩短,结束当前搜索会话,回到等待状态")
                        self.is_searching = False
                        self.search_inactive_count = 0
                        # 写入结束标记
                        end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                        self.write_to_forum_log(f"=== ForumEgine 搜索会话结束 - {end_time} ===")
                        print("ForumEgine: 已重置基线,等待下次FirstSummaryNode触发")
                    elif not any_growth and not captured_any:
                        # 没有增长也没有捕获内容,增加非活跃计数
                        self.search_inactive_count += 1
                        if self.search_inactive_count >= 30:  # 30秒无活动才结束
                            print("ForumEgine: 长时间无活动,结束搜索会话")
                            self.is_searching = False
                            self.search_inactive_count = 0
                            # 写入结束标记
                            end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                            self.write_to_forum_log(f"=== ForumEgine 搜索会话超时结束 - {end_time} ===")
                    else:
                        self.search_inactive_count = 0  # 重置计数器
                
                # 短暂休眠
                time.sleep(1)
                
            except Exception as e:
                print(f"ForumEgine: 监控过程中出错: {e}")
                import traceback
                traceback.print_exc()
                time.sleep(2)
        
        print("ForumEgine: 停止监控日志文件")
    
    def start_monitoring(self):
        """开始智能监控"""
        if self.is_monitoring:
            print("ForumEgine: 监控已经在运行中")
            return False
        
        try:
            # 启动监控
            self.is_monitoring = True
            self.monitor_thread = threading.Thread(target=self.monitor_logs, daemon=True)
            self.monitor_thread.start()
            
            print("ForumEgine: 智能监控已启动")
            return True
            
        except Exception as e:
            print(f"ForumEgine: 启动监控失败: {e}")
            self.is_monitoring = False
            return False
    
    def stop_monitoring(self):
        """停止监控"""
        if not self.is_monitoring:
            print("ForumEgine: 监控未运行")
            return
        
        try:
            self.is_monitoring = False
            
            if self.monitor_thread and self.monitor_thread.is_alive():
                self.monitor_thread.join(timeout=2)
            
            # 写入结束标记
            end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            self.write_to_forum_log(f"=== ForumEgine 监控结束 - {end_time} ===")
            
            print("ForumEgine: 监控已停止")
            
        except Exception as e:
            print(f"ForumEgine: 停止监控失败: {e}")
    
    def get_forum_log_content(self) -> List[str]:
        """获取forum.log的内容"""
        try:
            if not self.forum_log_file.exists():
                return []
            
            with open(self.forum_log_file, 'r', encoding='utf-8') as f:
                return [line.rstrip('\n\r') for line in f.readlines()]
                
        except Exception as e:
            print(f"ForumEgine: 读取forum.log失败: {e}")
            return []


# 全局监控器实例
_monitor_instance = None

def get_monitor() -> LogMonitor:
    """获取全局监控器实例"""
    global _monitor_instance
    if _monitor_instance is None:
        _monitor_instance = LogMonitor()
    return _monitor_instance

def start_forum_monitoring():
    """启动ForumEgine智能监控"""
    return get_monitor().start_monitoring()

def stop_forum_monitoring():
    """停止ForumEgine监控"""
    get_monitor().stop_monitoring()

def get_forum_log():
    """获取forum.log内容"""
    return get_monitor().get_forum_log_content()