马一丁

Update the storage scheme for GraphRAG query logs

... ... @@ -11,6 +11,7 @@ from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
from loguru import logger
from utils.knowledge_logger import append_knowledge_log, compact_records
from .base_node import BaseNode
from ..llms.base import LLMClient
... ... @@ -122,6 +123,8 @@ class GraphRAGQueryNode(BaseNode):
合并后的查询结果
"""
self.log_info(f"开始 GraphRAG 查询,章节: {section.get('title', 'unknown')}")
chapter_id = section.get("id") or section.get("chapter_id") or section.get("chapterId")
chapter_title = section.get("title", "unknown")
query_engine = QueryEngine(graph)
history = QueryHistory()
... ... @@ -154,11 +157,38 @@ class GraphRAGQueryNode(BaseNode):
engine_filter=decision.get('engine_filter'),
depth=decision.get('depth', 1)
)
params_dict = {
'keywords': params.keywords,
'node_types': params.node_types,
'engine_filter': params.engine_filter,
'depth': params.depth,
}
result = query_engine.query(params)
all_results.append(result)
self.log_info(f"查询返回 {result.total_nodes} 个节点")
try:
append_knowledge_log(
"GRAPH_QUERY_NODE",
{
"chapter_id": chapter_id or "",
"chapter_title": chapter_title,
"round": round_idx + 1,
"params": params_dict,
"result_counts": {
"matched_sections": len(result.matched_sections),
"matched_queries": len(result.matched_queries),
"matched_sources": len(result.matched_sources),
"total_nodes": result.total_nodes,
},
"matched_sections": compact_records(result.matched_sections[:5]),
"matched_queries": compact_records(result.matched_queries[:5]),
"matched_sources": compact_records(result.matched_sources[:5]),
},
)
except Exception as log_exc: # pragma: no cover - 日志失败不阻塞流程
logger.warning(f"Knowledge Query: GraphRAG 节点写日志失败: {log_exc}")
# 5. 记录历史
history.add(decision, result)
... ... @@ -169,6 +199,22 @@ class GraphRAGQueryNode(BaseNode):
self.log_info(f"GraphRAG 查询完成,共 {len(all_results)} 轮,"
f"获取 {merged.get('total_nodes', 0)} 个节点")
try:
append_knowledge_log(
"GRAPH_QUERY_SUMMARY",
{
"chapter_id": chapter_id or "",
"chapter_title": chapter_title,
"rounds": len(all_results),
"total_nodes": merged.get("total_nodes", 0),
"matched_sections": compact_records(merged.get("matched_sections", [])[:10]),
"matched_queries": compact_records(merged.get("matched_queries", [])[:10]),
"matched_sources": compact_records(merged.get("matched_sources", [])[:10]),
"cross_engine_insights": merged.get("cross_engine_insights", []),
},
)
except Exception as log_exc: # pragma: no cover - 日志失败不阻塞流程
logger.warning(f"Knowledge Query: 汇总写日志失败: {log_exc}")
return merged
... ...
... ... @@ -24,6 +24,11 @@ from loguru import logger
import importlib
from pathlib import Path
from MindSpider.main import MindSpider
from utils.knowledge_logger import (
append_knowledge_log,
compact_records as _compact_records,
init_knowledge_log,
)
# 导入ReportEngine
try:
... ... @@ -364,72 +369,6 @@ def init_forum_log():
# 初始化forum.log
init_forum_log()
# ===== 知识库查询日志(与 Forum 日志格式类似) =====
knowledge_log_lock = threading.Lock()
KNOWLEDGE_LOG_FILE = LOG_DIR / "knowledge_query.log"
def _sanitize_log_text(text: str) -> str:
"""移除换行/回车,防止日志污染。"""
return str(text).replace("\n", " ").replace("\r", " ").strip()
def init_knowledge_log():
"""初始化知识库查询日志文件。"""
try:
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
KNOWLEDGE_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with knowledge_log_lock, open(KNOWLEDGE_LOG_FILE, 'w', encoding='utf-8') as f:
f.write(f"=== Knowledge Query Log 初始化 - {start_time} ===\n")
logger.info("Knowledge Query: knowledge_query.log 已初始化")
except Exception as exc: # pragma: no cover - 仅运行时执行
logger.exception(f"Knowledge Query: 初始化日志失败: {exc}")
def append_knowledge_log(source: str, payload: dict):
"""记录知识库查询关键词与完整请求数据,防止日志污染。"""
try:
timestamp = datetime.now().strftime('%H:%M:%S')
clean_source = _sanitize_log_text(source or "UNKNOWN")
# JSON 序列化并截断,避免超大日志污染
serialized = json.dumps(payload, ensure_ascii=False)
sanitized = _sanitize_log_text(serialized)
with knowledge_log_lock, open(KNOWLEDGE_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] [KNOWLEDGE] [{clean_source}] {sanitized}\n")
except Exception as exc: # pragma: no cover - 日志失败不影响主流程
logger.warning(f"Knowledge Query: 写日志失败: {exc}")
def _trim_text(text: str, limit: int = 300) -> str:
text = _sanitize_log_text(text)
return text if len(text) <= limit else text[:limit] + "..."
def _compact_records(items):
"""将节点/记录压缩为简洁日志格式,避免污染。"""
compacted = []
if not items:
return compacted
for item in items:
if not isinstance(item, dict):
compacted.append(_trim_text(str(item)))
continue
entry = {}
for key, value in item.items():
# 仅记录必要字段,其他字段做字符串压缩
if isinstance(value, (str, int, float, bool)):
entry[key] = _trim_text(str(value))
else:
try:
entry[key] = _trim_text(json.dumps(value, ensure_ascii=False))
except Exception:
entry[key] = _trim_text(str(value))
compacted.append(entry)
return compacted
# 初始化 knowledge_query.log
init_knowledge_log()
... ...
"""
统一的知识图谱查询日志记录工具。
用于在不同模块(Flask接口、GraphRAG 查询节点等)之间共享
knowledge_query.log 的写入逻辑,避免分散实现导致日志缺失。
"""
import json
import threading
from datetime import datetime
from pathlib import Path
from loguru import logger
# 日志文件路径
ROOT_DIR = Path(__file__).resolve().parent.parent
LOG_DIR = ROOT_DIR / "logs"
KNOWLEDGE_LOG_FILE = LOG_DIR / "knowledge_query.log"
_log_lock = threading.Lock()
def _sanitize_log_text(text: str) -> str:
"""移除换行/回车,防止日志污染。"""
return str(text).replace("\n", " ").replace("\r", " ").strip()
def _trim_text(text: str, limit: int = 300) -> str:
"""对长文本进行截断,避免日志过长。"""
text = _sanitize_log_text(text)
return text if len(text) <= limit else text[:limit] + "..."
def compact_records(items):
"""
将节点/记录压缩为简洁日志格式,避免日志被大字段污染。
"""
compacted = []
if not items:
return compacted
for item in items:
if not isinstance(item, dict):
compacted.append(_trim_text(str(item)))
continue
entry = {}
for key, value in item.items():
if isinstance(value, (str, int, float, bool)):
entry[key] = _trim_text(str(value))
else:
try:
entry[key] = _trim_text(json.dumps(value, ensure_ascii=False))
except Exception:
entry[key] = _trim_text(str(value))
compacted.append(entry)
return compacted
def init_knowledge_log(force_reset: bool = True):
"""
初始化知识库查询日志文件。
Args:
force_reset: True 时重置文件并写入初始化标记;False 时仅在文件不存在时写入。
"""
try:
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
LOG_DIR.mkdir(parents=True, exist_ok=True)
mode = "w" if force_reset or not KNOWLEDGE_LOG_FILE.exists() else "a"
with _log_lock, open(KNOWLEDGE_LOG_FILE, mode, encoding="utf-8") as f:
f.write(f"=== Knowledge Query Log 初始化 - {start_time} ===\n")
logger.info("Knowledge Query: knowledge_query.log 已初始化")
except Exception as exc: # pragma: no cover - 仅运行时执行
logger.exception(f"Knowledge Query: 初始化日志失败: {exc}")
def _ensure_log_file():
"""确保日志文件已创建且可写,不会覆盖现有内容。"""
if not KNOWLEDGE_LOG_FILE.exists():
init_knowledge_log(force_reset=False)
def append_knowledge_log(source: str, payload: dict):
"""记录知识库查询关键词与完整请求数据。"""
try:
_ensure_log_file()
timestamp = datetime.now().strftime("%H:%M:%S")
clean_source = _sanitize_log_text(source or "UNKNOWN")
serialized = json.dumps(payload, ensure_ascii=False)
sanitized = _sanitize_log_text(serialized)
with _log_lock, open(KNOWLEDGE_LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [KNOWLEDGE] [{clean_source}] {sanitized}\n")
except Exception as exc: # pragma: no cover - 日志失败不影响主流程
logger.warning(f"Knowledge Query: 写日志失败: {exc}")
... ...