state.py
5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Report Engine状态管理
定义报告生成过程中的简化状态数据结构
"""
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
import json
from datetime import datetime
@dataclass
class ReportMetadata:
"""简化的报告元数据"""
query: str = "" # 原始查询
template_used: str = "" # 使用的模板名称
generation_time: float = 0.0 # 生成耗时(秒)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"query": self.query,
"template_used": self.template_used,
"generation_time": self.generation_time,
"timestamp": self.timestamp
}
@dataclass
class ReportState:
"""
简化的报告状态管理。
存储任务基本信息、输入、输出与元数据,供Agent与Flask层共享。
"""
# 基本信息
task_id: str = "" # 任务ID
query: str = "" # 原始查询
status: str = "pending" # 状态: pending, processing, completed, failed
# 输入数据
query_engine_report: str = "" # QueryEngine报告
media_engine_report: str = "" # MediaEngine报告
insight_engine_report: str = "" # InsightEngine报告
forum_logs: str = "" # 论坛日志
# 处理结果
selected_template: str = "" # 选择的模板
html_content: str = "" # 最终HTML内容
# 元数据
metadata: ReportMetadata = field(default_factory=ReportMetadata)
def __post_init__(self):
"""初始化后处理"""
if not self.task_id:
self.task_id = f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.metadata.query = self.query
def mark_processing(self):
"""标记为处理中,后台线程开始调度生成流程。"""
self.status = "processing"
def mark_completed(self):
"""标记为完成,同时意味着 `html_content` 已可用。"""
self.status = "completed"
def mark_failed(self, error_message: str = ""):
"""标记为失败,并记录最后一次错误消息。"""
self.status = "failed"
self.error_message = error_message
def is_completed(self) -> bool:
"""检查是否完成,包括状态为completed且存在HTML内容。"""
return self.status == "completed" and bool(self.html_content)
def get_progress(self) -> float:
"""获取进度百分比,按照模板/内容两个阶段粗略估算。"""
if self.status == "completed":
return 100.0
elif self.status == "processing":
# 简单的进度计算
progress = 0.0
if self.selected_template:
progress += 30.0
if self.html_content:
progress += 70.0
return progress
else:
return 0.0
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式,方便序列化给前端。"""
return {
"task_id": self.task_id,
"query": self.query,
"status": self.status,
"progress": self.get_progress(),
"selected_template": self.selected_template,
"has_html_content": bool(self.html_content),
"html_content_length": len(self.html_content) if self.html_content else 0,
"metadata": self.metadata.to_dict()
}
def save_to_file(self, file_path: str):
"""保存状态到文件,排除HTML正文以控制体积。"""
try:
state_data = self.to_dict()
# 不保存完整的HTML内容到状态文件(太大)
state_data.pop("html_content", None)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(state_data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"保存状态文件失败: {str(e)}")
@classmethod
def load_from_file(cls, file_path: str) -> Optional["ReportState"]:
"""从文件加载状态,仅恢复关键字段便于调试。"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 创建ReportState对象
state = cls(
task_id=data.get("task_id", ""),
query=data.get("query", ""),
status=data.get("status", "pending"),
selected_template=data.get("selected_template", "")
)
# 设置元数据
metadata_data = data.get("metadata", {})
state.metadata.template_used = metadata_data.get("template_used", "")
state.metadata.generation_time = metadata_data.get("generation_time", 0.0)
return state
except Exception as e:
print(f"加载状态文件失败: {str(e)}")
return None