戒酒的李白

The forum recording function has been initially completed.

"""
ForumEgine - 监控和记录三个Engine的SummaryNode和ReportFormattingNode输出
"""
from .monitor import LogMonitor
__all__ = ['LogMonitor']
... ...
"""
日志监控器 - 实时监控三个log文件中的SummaryNode和ReportFormattingNode输出
"""
import os
import time
import threading
from pathlib import Path
from datetime import datetime
import re
from typing import Dict, Optional, List
from threading import Lock
class LogMonitor:
"""基于文件变化的智能日志监控器"""
def __init__(self, log_dir: str = "logs"):
"""初始化日志监控器"""
self.log_dir = Path(log_dir)
self.forum_log_file = self.log_dir / "forum.log"
# 要监控的日志文件
self.monitored_logs = {
'insight': self.log_dir / 'insight.log',
'media': self.log_dir / 'media.log',
'query': self.log_dir / 'query.log'
}
# 监控状态
self.is_monitoring = False
self.monitor_thread = None
self.file_positions = {} # 记录每个文件的读取位置
self.file_line_counts = {} # 记录每个文件的行数
self.is_searching = False # 是否正在搜索
self.search_inactive_count = 0 # 搜索非活跃计数器
self.write_lock = Lock() # 写入锁,防止并发写入冲突
# 目标节点名称 - 直接匹配字符串
self.target_nodes = [
'FirstSummaryNode',
'ReflectionSummaryNode',
'ReportFormattingNode'
]
# 确保logs目录存在
self.log_dir.mkdir(exist_ok=True)
def clear_forum_log(self):
"""清空forum.log文件"""
try:
if self.forum_log_file.exists():
self.forum_log_file.unlink()
# 创建新的forum.log文件并写入开始标记
with open(self.forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEgine 监控开始 - {start_time} ===\n")
print(f"ForumEgine: forum.log 已清空并初始化")
except Exception as e:
print(f"ForumEgine: 清空forum.log失败: {e}")
def write_to_forum_log(self, content: str):
"""写入内容到forum.log(线程安全)"""
try:
with self.write_lock: # 使用锁确保线程安全
with open(self.forum_log_file, 'a', encoding='utf-8') as f:
timestamp = datetime.now().strftime('%H:%M:%S')
f.write(f"[{timestamp}] {content}\n")
f.flush()
except Exception as e:
print(f"ForumEgine: 写入forum.log失败: {e}")
def is_target_log_line(self, line: str) -> bool:
"""检查是否是目标日志行(SummaryNode或ReportFormattingNode)"""
# 简单字符串包含检查,更可靠
for node_name in self.target_nodes:
if node_name in line:
return True
return False
def extract_node_content(self, line: str) -> Optional[str]:
"""提取节点内容"""
# 移除时间戳部分,保留节点名称和消息
# 格式: [HH:MM:SS] [NodeName] message
match = re.search(r'\[\d{2}:\d{2}:\d{2}\]\s*(.+)', line)
if match:
return match.group(1).strip()
return line.strip()
def get_file_size(self, file_path: Path) -> int:
"""获取文件大小"""
try:
return file_path.stat().st_size if file_path.exists() else 0
except:
return 0
def get_file_line_count(self, file_path: Path) -> int:
"""获取文件行数"""
try:
if not file_path.exists():
return 0
with open(file_path, 'r', encoding='utf-8') as f:
return sum(1 for _ in f)
except:
return 0
# 移除这个方法,逻辑已经合并到monitor_logs中
def read_new_lines(self, file_path: Path, app_name: str) -> List[str]:
"""读取文件中的新行"""
new_lines = []
try:
if not file_path.exists():
return new_lines
current_size = self.get_file_size(file_path)
last_position = self.file_positions.get(app_name, 0)
# 如果文件变小了,说明被清空了,重新从头开始
if current_size < last_position:
last_position = 0
if current_size > last_position:
with open(file_path, 'r', encoding='utf-8') as f:
f.seek(last_position)
new_content = f.read()
new_lines = new_content.split('\n')
# 更新位置
self.file_positions[app_name] = f.tell()
# 过滤空行
new_lines = [line.strip() for line in new_lines if line.strip()]
except Exception as e:
print(f"ForumEgine: 读取{app_name}日志失败: {e}")
return new_lines
def monitor_logs(self):
"""智能监控日志文件"""
print("ForumEgine: 开始智能监控日志文件...")
# 初始化文件行数和位置 - 记录当前状态作为基线
for app_name, log_file in self.monitored_logs.items():
self.file_line_counts[app_name] = self.get_file_line_count(log_file)
self.file_positions[app_name] = self.get_file_size(log_file)
print(f"ForumEgine: {app_name} 基线行数: {self.file_line_counts[app_name]}")
while self.is_monitoring:
try:
# 同时检测三个log文件的变化
any_growth = False
any_shrink = False
captured_any = False
# 为每个log文件独立处理
for app_name, log_file in self.monitored_logs.items():
current_lines = self.get_file_line_count(log_file)
previous_lines = self.file_line_counts.get(app_name, 0)
if current_lines > previous_lines:
any_growth = True
# 立即读取新增内容
new_lines = self.read_new_lines(log_file, app_name)
# 先检查是否需要触发搜索(只触发一次)
if not self.is_searching:
for line in new_lines:
if line.strip() and 'FirstSummaryNode' in line:
print(f"ForumEgine: 在{app_name}中检测到FirstSummaryNode,开始监控记录")
self.is_searching = True
self.search_inactive_count = 0
# 清空forum.log开始新会话
self.clear_forum_log()
break # 找到一个就够了,跳出循环
# 处理所有新增内容(如果正在搜索状态)
if self.is_searching:
for line in new_lines:
if line.strip() and self.is_target_log_line(line):
# 立即记录目标节点输出
formatted_content = f"[{app_name.upper()}] {line.strip()}"
self.write_to_forum_log(formatted_content)
print(f"ForumEgine: 捕获 - {formatted_content}")
captured_any = True
elif current_lines < previous_lines:
any_shrink = True
print(f"ForumEgine: 检测到 {app_name} 日志缩短,将重置基线")
# 重置文件位置到新的文件末尾
self.file_positions[app_name] = self.get_file_size(log_file)
# 更新行数记录
self.file_line_counts[app_name] = current_lines
# 检查是否应该结束当前搜索会话
if self.is_searching:
if any_shrink:
# log变短,结束当前搜索会话,重置为等待状态
print("ForumEgine: 日志缩短,结束当前搜索会话,回到等待状态")
self.is_searching = False
self.search_inactive_count = 0
# 写入结束标记
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.write_to_forum_log(f"=== ForumEgine 搜索会话结束 - {end_time} ===")
print("ForumEgine: 已重置基线,等待下次FirstSummaryNode触发")
elif not any_growth and not captured_any:
# 没有增长也没有捕获内容,增加非活跃计数
self.search_inactive_count += 1
if self.search_inactive_count >= 30: # 30秒无活动才结束
print("ForumEgine: 长时间无活动,结束搜索会话")
self.is_searching = False
self.search_inactive_count = 0
# 写入结束标记
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.write_to_forum_log(f"=== ForumEgine 搜索会话超时结束 - {end_time} ===")
else:
self.search_inactive_count = 0 # 重置计数器
# 短暂休眠
time.sleep(1)
except Exception as e:
print(f"ForumEgine: 监控过程中出错: {e}")
import traceback
traceback.print_exc()
time.sleep(2)
print("ForumEgine: 停止监控日志文件")
def start_monitoring(self):
"""开始智能监控"""
if self.is_monitoring:
print("ForumEgine: 监控已经在运行中")
return False
try:
# 启动监控
self.is_monitoring = True
self.monitor_thread = threading.Thread(target=self.monitor_logs, daemon=True)
self.monitor_thread.start()
print("ForumEgine: 智能监控已启动")
return True
except Exception as e:
print(f"ForumEgine: 启动监控失败: {e}")
self.is_monitoring = False
return False
def stop_monitoring(self):
"""停止监控"""
if not self.is_monitoring:
print("ForumEgine: 监控未运行")
return
try:
self.is_monitoring = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=2)
# 写入结束标记
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self.write_to_forum_log(f"=== ForumEgine 监控结束 - {end_time} ===")
print("ForumEgine: 监控已停止")
except Exception as e:
print(f"ForumEgine: 停止监控失败: {e}")
def get_forum_log_content(self) -> List[str]:
"""获取forum.log的内容"""
try:
if not self.forum_log_file.exists():
return []
with open(self.forum_log_file, 'r', encoding='utf-8') as f:
return [line.rstrip('\n\r') for line in f.readlines()]
except Exception as e:
print(f"ForumEgine: 读取forum.log失败: {e}")
return []
# 全局监控器实例
_monitor_instance = None
def get_monitor() -> LogMonitor:
"""获取全局监控器实例"""
global _monitor_instance
if _monitor_instance is None:
_monitor_instance = LogMonitor()
return _monitor_instance
def start_forum_monitoring():
"""启动ForumEgine智能监控"""
return get_monitor().start_monitoring()
def stop_forum_monitoring():
"""停止ForumEgine监控"""
get_monitor().stop_monitoring()
def get_forum_log():
"""获取forum.log内容"""
return get_monitor().get_forum_log_content()
... ...
... ... @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -79,7 +79,7 @@ class ReportStructureNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -93,7 +93,7 @@ class FirstSearchNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -228,7 +228,7 @@ class ReflectionNode(BaseNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -97,7 +97,7 @@ class FirstSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ... @@ -243,7 +243,7 @@ class ReflectionSummaryNode(StateMutationNode):
cleaned_output = clean_json_tags(cleaned_output)
# 记录清理后的输出用于调试
self.log_info(f"清理后的输出: {cleaned_output[:200]}...")
self.log_info(f"清理后的输出: {cleaned_output}")
# 解析JSON
try:
... ...
... ... @@ -30,6 +30,39 @@ os.environ['PYTHONUTF8'] = '1'
LOG_DIR = Path('logs')
LOG_DIR.mkdir(exist_ok=True)
# 初始化ForumEgine的forum.log文件
def init_forum_log():
"""初始化forum.log文件"""
try:
forum_log_file = LOG_DIR / "forum.log"
if not forum_log_file.exists():
with open(forum_log_file, 'w', encoding='utf-8') as f:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
f.write(f"=== ForumEgine 系统初始化 - {start_time} ===\n")
print(f"ForumEgine: forum.log 已初始化")
except Exception as e:
print(f"ForumEgine: 初始化forum.log失败: {e}")
# 初始化forum.log
init_forum_log()
# 启动ForumEgine智能监控
def start_forum_engine():
"""启动ForumEgine智能监控"""
try:
from ForumEgine.monitor import start_forum_monitoring
print("ForumEgine: 启动智能监控...")
success = start_forum_monitoring()
if success:
print("ForumEgine: 智能监控已启动,将自动检测搜索活动")
else:
print("ForumEgine: 智能监控启动失败")
except Exception as e:
print(f"ForumEgine: 启动智能监控失败: {e}")
# 启动ForumEgine
start_forum_engine()
# 全局变量存储进程信息
processes = {
'insight': {'process': None, 'port': 8501, 'status': 'stopped', 'output': [], 'log_file': None},
... ... @@ -382,6 +415,43 @@ def test_log(app_name):
'message': f'测试消息已写入 {app_name} 日志'
})
@app.route('/api/forum/start')
def start_forum_monitoring_api():
"""手动启动ForumEgine监控"""
try:
from ForumEgine.monitor import start_forum_monitoring
success = start_forum_monitoring()
if success:
return jsonify({'success': True, 'message': 'ForumEgine监控已启动'})
else:
return jsonify({'success': False, 'message': 'ForumEgine监控启动失败'})
except Exception as e:
return jsonify({'success': False, 'message': f'启动监控失败: {str(e)}'})
@app.route('/api/forum/stop')
def stop_forum_monitoring_api():
"""手动停止ForumEgine监控"""
try:
from ForumEgine.monitor import stop_forum_monitoring
stop_forum_monitoring()
return jsonify({'success': True, 'message': 'ForumEgine监控已停止'})
except Exception as e:
return jsonify({'success': False, 'message': f'停止监控失败: {str(e)}'})
@app.route('/api/forum/log')
def get_forum_log():
"""获取ForumEgine的forum.log内容"""
try:
from ForumEgine.monitor import get_forum_log
log_content = get_forum_log()
return jsonify({
'success': True,
'log_lines': log_content,
'total_lines': len(log_content)
})
except Exception as e:
return jsonify({'success': False, 'message': f'读取forum.log失败: {str(e)}'})
@app.route('/api/search', methods=['POST'])
def search():
"""统一搜索接口"""
... ... @@ -391,6 +461,9 @@ def search():
if not query:
return jsonify({'success': False, 'message': '搜索查询不能为空'})
# ForumEgine智能监控已经在后台运行,会自动检测搜索活动
print("ForumEgine: 搜索请求已收到,智能监控将自动检测日志变化")
# 检查哪些应用正在运行
check_app_status()
running_apps = [name for name, info in processes.items() if info['status'] == 'running']
... ... @@ -418,6 +491,9 @@ def search():
except Exception as e:
results[app_name] = {'success': False, 'message': str(e)}
# 搜索完成后可以选择停止监控,或者让它继续运行以捕获后续的处理日志
# 这里我们让监控继续运行,用户可以通过其他接口手动停止
return jsonify({
'success': True,
'query': query,
... ...