agent.py
63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
"""
Report Agent主类。
该模块串联模板选择、布局设计、章节生成、IR装订与HTML渲染等
所有子流程,是Report Engine的总调度中心。核心职责包括:
1. 管理输入数据与状态,协调三个分析引擎、论坛日志与模板;
2. 按节点顺序驱动模板选择→布局生成→篇幅规划→章节写作→装订渲染;
3. 负责错误兜底、流式事件分发、落盘清单与最终成果保存。
"""
import json
import os
from copy import deepcopy
from pathlib import Path
from uuid import uuid4
from datetime import datetime
from typing import Optional, Dict, Any, List, Callable, Tuple
from loguru import logger
from .core import (
ChapterStorage,
DocumentComposer,
TemplateSection,
parse_template_sections,
)
from .ir import IRValidator
from .llms import LLMClient
from .nodes import (
TemplateSelectionNode,
ChapterGenerationNode,
ChapterJsonParseError,
ChapterContentError,
ChapterValidationError,
DocumentLayoutNode,
WordBudgetNode,
)
from .renderers import HTMLRenderer
from .state import ReportState
from .utils.config import settings, Settings
class StageOutputFormatError(ValueError):
"""阶段性输出结构不符合预期时抛出的受控异常。"""
class FileCountBaseline:
"""
文件数量基准管理器。
该工具用于:
- 在任务启动时记录 Insight/Media/Query 三个引擎导出的 Markdown 数量;
- 在后续轮询中快速判断是否有新报告落地;
- 为 Flask 层提供“输入是否准备完毕”的依据。
"""
def __init__(self):
"""
初始化时优先尝试读取既有的基准快照。
若 `logs/report_baseline.json` 不存在则会自动创建一份空快照,
以便后续 `initialize_baseline` 在首次运行时写入真实基准。
"""
self.baseline_file = 'logs/report_baseline.json'
self.baseline_data = self._load_baseline()
def _load_baseline(self) -> Dict[str, int]:
"""
加载基准数据。
- 当快照文件存在时直接解析JSON;
- 捕获所有加载异常并返回空字典,保证调用方逻辑简洁。
"""
try:
if os.path.exists(self.baseline_file):
with open(self.baseline_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.exception(f"加载基准数据失败: {e}")
return {}
def _save_baseline(self):
"""
将当前基准写入磁盘。
采用 `ensure_ascii=False` + 缩进格式,方便人工查看;
若目标目录缺失则自动创建。
"""
try:
os.makedirs(os.path.dirname(self.baseline_file), exist_ok=True)
with open(self.baseline_file, 'w', encoding='utf-8') as f:
json.dump(self.baseline_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.exception(f"保存基准数据失败: {e}")
def initialize_baseline(self, directories: Dict[str, str]) -> Dict[str, int]:
"""
初始化文件数量基准。
遍历每个引擎目录并统计 `.md` 文件数量,将结果持久化为
初始基准。后续 `check_new_files` 会据此对比增量。
"""
current_counts = {}
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
current_counts[engine] = len(md_files)
else:
current_counts[engine] = 0
# 保存基准数据
self.baseline_data = current_counts.copy()
self._save_baseline()
logger.info(f"文件数量基准已初始化: {current_counts}")
return current_counts
def check_new_files(self, directories: Dict[str, str]) -> Dict[str, Any]:
"""
检查是否有新文件。
对比当前目录文件数与基准:
- 统计新增数量,并判定是否所有引擎都已准备就绪;
- 返回详细计数、缺失列表,供 Web 层提示给用户。
"""
current_counts = {}
new_files_found = {}
all_have_new = True
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
current_counts[engine] = len(md_files)
baseline_count = self.baseline_data.get(engine, 0)
if current_counts[engine] > baseline_count:
new_files_found[engine] = current_counts[engine] - baseline_count
else:
new_files_found[engine] = 0
all_have_new = False
else:
current_counts[engine] = 0
new_files_found[engine] = 0
all_have_new = False
return {
'ready': all_have_new,
'baseline_counts': self.baseline_data,
'current_counts': current_counts,
'new_files_found': new_files_found,
'missing_engines': [engine for engine, count in new_files_found.items() if count == 0]
}
def get_latest_files(self, directories: Dict[str, str]) -> Dict[str, str]:
"""
获取每个目录的最新文件。
通过 `os.path.getmtime` 找出最近写入的 Markdown,
以确保生成流程永远使用最新一版三引擎报告。
"""
latest_files = {}
for engine, directory in directories.items():
if os.path.exists(directory):
md_files = [f for f in os.listdir(directory) if f.endswith('.md')]
if md_files:
latest_file = max(md_files, key=lambda x: os.path.getmtime(os.path.join(directory, x)))
latest_files[engine] = os.path.join(directory, latest_file)
return latest_files
class ReportAgent:
"""
Report Agent主类。
负责集成:
- LLM客户端及其上层四个推理节点;
- 章节存储、IR装订、渲染器等产出链路;
- 状态管理、日志、输入输出校验与持久化。
"""
_CONTENT_SPARSE_MIN_ATTEMPTS = 3
_CONTENT_SPARSE_WARNING_TEXT = "本章LLM生成的内容字数可能过低,必要时可以尝试重新运行程序。"
_STRUCTURAL_RETRY_ATTEMPTS = 2
def __init__(self, config: Optional[Settings] = None):
"""
初始化Report Agent。
Args:
config: 配置对象,如果不提供则自动加载
步骤概览:
1. 解析配置并接入日志/LLM/渲染等核心组件;
2. 构造四个推理节点(模板、布局、篇幅、章节);
3. 初始化文件基准与章节落盘目录;
4. 构建可序列化的状态容器,供外部服务查询。
"""
# 加载配置
self.config = config or settings
# 初始化文件基准管理器
self.file_baseline = FileCountBaseline()
# 初始化日志
self._setup_logging()
# 初始化LLM客户端
self.llm_client = self._initialize_llm()
self.json_rescue_clients = self._initialize_rescue_llms()
# 初始化章级存储/校验/渲染组件
self.chapter_storage = ChapterStorage(self.config.CHAPTER_OUTPUT_DIR)
self.document_composer = DocumentComposer()
self.validator = IRValidator()
self.renderer = HTMLRenderer()
# 初始化节点
self._initialize_nodes()
# 初始化文件数量基准
self._initialize_file_baseline()
# 状态
self.state = ReportState()
# 确保输出目录存在
os.makedirs(self.config.OUTPUT_DIR, exist_ok=True)
os.makedirs(self.config.DOCUMENT_IR_OUTPUT_DIR, exist_ok=True)
logger.info("Report Agent已初始化")
logger.info(f"使用LLM: {self.llm_client.get_model_info()}")
def _setup_logging(self):
"""
设置日志。
- 确保日志目录存在;
- 使用独立的 loguru sink 写入 Report Engine 专属 log 文件,
避免与其他子系统混淆。
- 【修复】配置实时日志写入,禁用缓冲,确保前端实时看到日志
- 【修复】防止重复添加handler
"""
# 确保日志目录存在
log_dir = os.path.dirname(self.config.LOG_FILE)
os.makedirs(log_dir, exist_ok=True)
def _exclude_other_engines(record):
"""
过滤掉其他引擎(Insight/Media/Query/Forum)产生的日志,其余日志全部保留。
使用路径匹配为主,无法获取路径时退化到模块名。
"""
excluded_keywords = ("InsightEngine", "MediaEngine", "QueryEngine", "ForumEngine")
try:
file_path = record["file"].path
if any(keyword in file_path for keyword in excluded_keywords):
return False
except Exception:
pass
try:
module_name = record.get("module", "")
if isinstance(module_name, str):
lowered = module_name.lower()
if any(keyword.lower() in lowered for keyword in excluded_keywords):
return False
except Exception:
pass
return True
# 【修复】检查是否已经添加过这个文件的handler,避免重复
# loguru会自动去重,但显式检查更安全
log_file_path = str(Path(self.config.LOG_FILE).resolve())
# 检查现有的handlers
handler_exists = False
for handler_id, handler_config in logger._core.handlers.items():
if hasattr(handler_config, 'sink'):
sink = handler_config.sink
# 检查是否是文件sink且路径相同
if hasattr(sink, '_name') and sink._name == log_file_path:
handler_exists = True
logger.debug(f"日志handler已存在,跳过添加: {log_file_path}")
break
if not handler_exists:
# 【修复】创建专用的logger,配置实时写入
# - enqueue=False: 禁用异步队列,立即写入
# - buffering=1: 行缓冲,每条日志立即刷新到文件
# - level="DEBUG": 记录所有级别的日志
# - encoding="utf-8": 明确指定UTF-8编码
# - mode="a": 追加模式,保留历史日志
handler_id = logger.add(
self.config.LOG_FILE,
level="DEBUG",
enqueue=False, # 禁用异步队列,同步写入
buffering=1, # 行缓冲,每行立即写入
serialize=False, # 普通文本格式,不序列化为JSON
encoding="utf-8", # 明确UTF-8编码
mode="a", # 追加模式
filter=_exclude_other_engines # 过滤掉四个 Engine 的日志,保留其余信息
)
logger.debug(f"已添加日志handler (ID: {handler_id}): {self.config.LOG_FILE}")
# 【修复】验证日志文件可写
try:
with open(self.config.LOG_FILE, 'a', encoding='utf-8') as f:
f.write('') # 尝试写入空字符串验证权限
f.flush() # 立即刷新
except Exception as e:
logger.error(f"日志文件无法写入: {self.config.LOG_FILE}, 错误: {e}")
raise
def _initialize_file_baseline(self):
"""
初始化文件数量基准。
将 Insight/Media/Query 三个目录传入 `FileCountBaseline`,
生成一次性的参考值,之后按增量判断三引擎是否产出新报告。
"""
directories = {
'insight': 'insight_engine_streamlit_reports',
'media': 'media_engine_streamlit_reports',
'query': 'query_engine_streamlit_reports'
}
self.file_baseline.initialize_baseline(directories)
def _initialize_llm(self) -> LLMClient:
"""
初始化LLM客户端。
利用配置中的 API Key / 模型 / Base URL 构建统一的
`LLMClient` 实例,为所有节点提供复用的推理入口。
"""
return LLMClient(
api_key=self.config.REPORT_ENGINE_API_KEY,
model_name=self.config.REPORT_ENGINE_MODEL_NAME,
base_url=self.config.REPORT_ENGINE_BASE_URL,
)
def _initialize_rescue_llms(self) -> List[Tuple[str, LLMClient]]:
"""
初始化跨引擎章节修复所需的LLM客户端列表。
顺序遵循“Report → Forum → Insight → Media”,缺失配置会被自动跳过。
"""
clients: List[Tuple[str, LLMClient]] = []
if self.llm_client:
clients.append(("report_engine", self.llm_client))
fallback_specs = [
(
"forum_engine",
self.config.FORUM_HOST_API_KEY,
self.config.FORUM_HOST_MODEL_NAME,
self.config.FORUM_HOST_BASE_URL,
),
(
"insight_engine",
self.config.INSIGHT_ENGINE_API_KEY,
self.config.INSIGHT_ENGINE_MODEL_NAME,
self.config.INSIGHT_ENGINE_BASE_URL,
),
(
"media_engine",
self.config.MEDIA_ENGINE_API_KEY,
self.config.MEDIA_ENGINE_MODEL_NAME,
self.config.MEDIA_ENGINE_BASE_URL,
),
]
for label, api_key, model_name, base_url in fallback_specs:
if not api_key or not model_name:
continue
try:
client = LLMClient(api_key=api_key, model_name=model_name, base_url=base_url)
except Exception as exc:
logger.warning(f"{label} LLM初始化失败,跳过该修复通道: {exc}")
continue
clients.append((label, client))
return clients
def _initialize_nodes(self):
"""
初始化处理节点。
顺序实例化模板选择、文档布局、篇幅规划、章节生成四个节点,
其中章节节点额外依赖 IR 校验器与章节存储器。
"""
self.template_selection_node = TemplateSelectionNode(
self.llm_client,
self.config.TEMPLATE_DIR
)
self.document_layout_node = DocumentLayoutNode(self.llm_client)
self.word_budget_node = WordBudgetNode(self.llm_client)
self.chapter_generation_node = ChapterGenerationNode(
self.llm_client,
self.validator,
self.chapter_storage,
fallback_llm_clients=self.json_rescue_clients,
error_log_dir=self.config.JSON_ERROR_LOG_DIR,
)
def generate_report(self, query: str, reports: List[Any], forum_logs: str = "",
custom_template: str = "", save_report: bool = True,
stream_handler: Optional[Callable[[str, Dict[str, Any]], None]] = None) -> str:
"""
生成综合报告(章节JSON → IR → HTML)。
主要阶段:
1. 归一化三引擎报告 + 论坛日志,并输出流式事件;
2. 模板选择 → 模板切片 → 文档布局 → 篇幅规划;
3. 结合篇幅目标逐章调用LLM,遇到解析错误会自动重试;
4. 将章节装订成Document IR,再交给HTML渲染器生成成品;
5. 可选地将HTML/IR/状态落盘,并向外界回传路径信息。
参数:
query: 最终要生成的报告主题或提问语句。
reports: 来自 Query/Media/Insight 等分析引擎的原始输出,允许传入字符串或更复杂的对象。
forum_logs: 论坛/协同记录,供LLM理解多人讨论上下文。
custom_template: 用户指定的Markdown模板,如为空则交由模板节点自动挑选。
save_report: 是否在生成后自动将HTML、IR与状态写入磁盘。
stream_handler: 可选的流式事件回调,接收阶段标签与payload,用于UI实时展示。
返回:
dict: 包含 `html_content` 以及HTML/IR/状态文件路径的字典;若 `save_report=False` 则仅返回HTML字符串。
异常:
Exception: 任一子节点或渲染阶段失败时抛出,外层调用方负责兜底。
"""
start_time = datetime.now()
report_id = f"report-{uuid4().hex[:8]}"
self.state.task_id = report_id
self.state.query = query
self.state.metadata.query = query
self.state.mark_processing()
normalized_reports = self._normalize_reports(reports)
def emit(event_type: str, payload: Dict[str, Any]):
"""面向Report Engine流通道的事件分发器,保证错误不外泄。"""
if not stream_handler:
return
try:
stream_handler(event_type, payload)
except Exception as callback_error: # pragma: no cover - 仅记录
logger.warning(f"流式事件回调失败: {callback_error}")
logger.info(f"开始生成报告 {report_id}: {query}")
logger.info(f"输入数据 - 报告数量: {len(reports)}, 论坛日志长度: {len(str(forum_logs))}")
emit('stage', {'stage': 'agent_start', 'report_id': report_id, 'query': query})
try:
template_result = self._select_template(query, reports, forum_logs, custom_template)
template_result = self._ensure_mapping(
template_result,
"模板选择结果",
expected_keys=["template_name", "template_content"],
)
self.state.metadata.template_used = template_result.get('template_name', '')
emit('stage', {
'stage': 'template_selected',
'template': template_result.get('template_name'),
'reason': template_result.get('selection_reason')
})
emit('progress', {'progress': 10, 'message': '模板选择完成'})
sections = self._slice_template(template_result.get('template_content', ''))
if not sections:
raise ValueError("模板无法解析出章节,请检查模板内容。")
emit('stage', {'stage': 'template_sliced', 'section_count': len(sections)})
template_text = template_result.get('template_content', '')
template_overview = self._build_template_overview(template_text, sections)
# 基于模板骨架+三引擎内容设计全局标题、目录与视觉主题
layout_design = self._run_stage_with_retry(
"文档设计",
lambda: self.document_layout_node.run(
sections,
template_text,
normalized_reports,
forum_logs,
query,
template_overview,
),
# toc 字段已被 tocPlan 取代,这里按最新Schema挑选/校验
expected_keys=["title", "hero", "tocPlan", "tocTitle"],
)
emit('stage', {
'stage': 'layout_designed',
'title': layout_design.get('title'),
'toc': layout_design.get('tocTitle')
})
emit('progress', {'progress': 15, 'message': '文档标题/目录设计完成'})
# 使用刚生成的设计稿对全书进行篇幅规划,约束各章字数与重点
word_plan = self._run_stage_with_retry(
"章节篇幅规划",
lambda: self.word_budget_node.run(
sections,
layout_design,
normalized_reports,
forum_logs,
query,
template_overview,
),
expected_keys=["chapters", "totalWords", "globalGuidelines"],
postprocess=self._normalize_word_plan,
)
emit('stage', {
'stage': 'word_plan_ready',
'chapter_targets': len(word_plan.get('chapters', []))
})
emit('progress', {'progress': 20, 'message': '章节字数规划已生成'})
# 记录每个章节的目标字数/强调点,后续传给章节LLM
chapter_targets = {
entry.get("chapterId"): entry
for entry in word_plan.get("chapters", [])
if entry.get("chapterId")
}
generation_context = self._build_generation_context(
query,
normalized_reports,
forum_logs,
template_result,
layout_design,
chapter_targets,
word_plan,
template_overview,
)
# IR/渲染需要的全局元数据,带上设计稿给出的标题/主题/目录/篇幅信息
manifest_meta = {
"query": query,
"title": layout_design.get("title") or (f"{query} - 舆情洞察报告" if query else template_result.get("template_name")),
"subtitle": layout_design.get("subtitle"),
"tagline": layout_design.get("tagline"),
"templateName": template_result.get("template_name"),
"selectionReason": template_result.get("selection_reason"),
"themeTokens": generation_context.get("theme_tokens", {}),
"toc": {
"depth": 3,
"autoNumbering": True,
"title": layout_design.get("tocTitle") or "目录",
},
"hero": layout_design.get("hero"),
"layoutNotes": layout_design.get("layoutNotes"),
"wordPlan": {
"totalWords": word_plan.get("totalWords"),
"globalGuidelines": word_plan.get("globalGuidelines"),
},
"templateOverview": template_overview,
}
if layout_design.get("themeTokens"):
manifest_meta["themeTokens"] = layout_design["themeTokens"]
if layout_design.get("tocPlan"):
manifest_meta["toc"]["customEntries"] = layout_design["tocPlan"]
# 初始化章节输出目录并写入manifest,方便流式存盘
run_dir = self.chapter_storage.start_session(report_id, manifest_meta)
self._persist_planning_artifacts(run_dir, layout_design, word_plan, template_overview)
emit('stage', {'stage': 'storage_ready', 'run_dir': str(run_dir)})
chapters = []
chapter_max_attempts = max(
self._CONTENT_SPARSE_MIN_ATTEMPTS, self.config.CHAPTER_JSON_MAX_ATTEMPTS
)
total_chapters = len(sections) # 总章节数
completed_chapters = 0 # 已完成章节数
for section in sections:
logger.info(f"生成章节: {section.title}")
emit('chapter_status', {
'chapterId': section.chapter_id,
'title': section.title,
'status': 'running'
})
# 章节流式回调:把LLM返回的delta透传给SSE,便于前端实时渲染
def chunk_callback(delta: str, meta: Dict[str, Any], section_ref: TemplateSection = section):
"""
章节内容流式回调。
Args:
delta: LLM最新输出的增量文本。
meta: 节点回传的章节元数据,兜底时使用。
section_ref: 默认指向当前章节,保证在缺失元信息时也能定位。
"""
emit('chapter_chunk', {
'chapterId': meta.get('chapterId') or section_ref.chapter_id,
'title': meta.get('title') or section_ref.title,
'delta': delta
})
chapter_payload: Dict[str, Any] | None = None
attempt = 1
best_sparse_candidate: Dict[str, Any] | None = None
best_sparse_score = -1
fallback_used = False
while attempt <= chapter_max_attempts:
try:
chapter_payload = self.chapter_generation_node.run(
section,
generation_context,
run_dir,
stream_callback=chunk_callback
)
break
except (ChapterJsonParseError, ChapterContentError, ChapterValidationError) as structured_error:
if isinstance(structured_error, ChapterContentError):
error_kind = "content_sparse"
readable_label = "内容密度异常"
elif isinstance(structured_error, ChapterValidationError):
error_kind = "validation"
readable_label = "结构校验失败"
else:
error_kind = "json_parse"
readable_label = "JSON解析失败"
if isinstance(structured_error, ChapterContentError):
candidate = getattr(structured_error, "chapter_payload", None)
candidate_score = getattr(structured_error, "body_characters", 0) or 0
if isinstance(candidate, dict) and candidate_score >= 0:
if candidate_score > best_sparse_score:
best_sparse_candidate = deepcopy(candidate)
best_sparse_score = candidate_score
will_fallback = (
isinstance(structured_error, ChapterContentError)
and attempt >= chapter_max_attempts
and attempt >= self._CONTENT_SPARSE_MIN_ATTEMPTS
and best_sparse_candidate is not None
)
logger.warning(
"章节 {title} {label}(第 {attempt}/{total} 次尝试): {error}",
title=section.title,
label=readable_label,
attempt=attempt,
total=chapter_max_attempts,
error=structured_error,
)
status_value = 'retrying' if attempt < chapter_max_attempts or will_fallback else 'error'
status_payload = {
'chapterId': section.chapter_id,
'title': section.title,
'status': status_value,
'attempt': attempt,
'error': str(structured_error),
'reason': error_kind,
}
if isinstance(structured_error, ChapterValidationError):
validation_errors = getattr(structured_error, "errors", None)
if validation_errors:
status_payload['errors'] = validation_errors
if will_fallback:
status_payload['warning'] = 'content_sparse_fallback_pending'
emit('chapter_status', status_payload)
if will_fallback:
logger.warning(
"章节 {title} 达到最大尝试次数,保留字数最多(约 {score} 字)的版本作为兜底输出",
title=section.title,
score=best_sparse_score,
)
chapter_payload = self._finalize_sparse_chapter(best_sparse_candidate)
fallback_used = True
break
if attempt >= chapter_max_attempts:
raise
attempt += 1
continue
except (AttributeError, TypeError, KeyError, IndexError, ValueError, json.JSONDecodeError) as structure_error:
# 捕获因 JSON 结构异常导致的运行时错误,包装为可重试异常
# 包括:
# - AttributeError: 如 list.get() 调用失败
# - TypeError: 类型不匹配
# - KeyError: 字典键缺失
# - IndexError: 列表索引越界
# - ValueError: 值错误(如 LLM 返回空内容、缺少必要字段)
# - json.JSONDecodeError: JSON 解析失败(未被内部捕获的情况)
error_type = type(structure_error).__name__
logger.warning(
"章节 {title} 生成过程中发生 {error_type}(第 {attempt}/{total} 次尝试),将尝试重新生成: {error}",
title=section.title,
error_type=error_type,
attempt=attempt,
total=chapter_max_attempts,
error=structure_error,
)
emit('chapter_status', {
'chapterId': section.chapter_id,
'title': section.title,
'status': 'retrying' if attempt < chapter_max_attempts else 'error',
'attempt': attempt,
'error': str(structure_error),
'reason': 'structure_error',
'error_type': error_type
})
if attempt >= chapter_max_attempts:
# 达到最大重试次数,包装为 ChapterJsonParseError 抛出
raise ChapterJsonParseError(
f"{section.title} 章节因 {error_type} 在 {chapter_max_attempts} 次尝试后仍无法生成: {structure_error}"
) from structure_error
attempt += 1
continue
except Exception as chapter_error:
if not self._should_retry_inappropriate_content_error(chapter_error):
raise
logger.warning(
"章节 {title} 触发内容安全限制(第 {attempt}/{total} 次尝试),准备重新生成: {error}",
title=section.title,
attempt=attempt,
total=chapter_max_attempts,
error=chapter_error,
)
emit('chapter_status', {
'chapterId': section.chapter_id,
'title': section.title,
'status': 'retrying' if attempt < chapter_max_attempts else 'error',
'attempt': attempt,
'error': str(chapter_error),
'reason': 'content_filter'
})
if attempt >= chapter_max_attempts:
raise
attempt += 1
continue
if chapter_payload is None:
raise ChapterJsonParseError(
f"{section.title} 章节JSON在 {chapter_max_attempts} 次尝试后仍无法解析"
)
chapters.append(chapter_payload)
completed_chapters += 1 # 更新已完成章节数
# 计算当前进度:20% + 80% * (已完成章节数 / 总章节数),四舍五入
chapter_progress = 20 + round(80 * completed_chapters / total_chapters)
emit('progress', {
'progress': chapter_progress,
'message': f'章节 {completed_chapters}/{total_chapters} 已完成'
})
completion_status = {
'chapterId': section.chapter_id,
'title': section.title,
'status': 'completed',
'attempt': attempt,
}
if fallback_used:
completion_status['warning'] = 'content_sparse_fallback'
completion_status['warningMessage'] = self._CONTENT_SPARSE_WARNING_TEXT
emit('chapter_status', completion_status)
document_ir = self.document_composer.build_document(
report_id,
manifest_meta,
chapters
)
emit('stage', {'stage': 'chapters_compiled', 'chapter_count': len(chapters)})
html_report = self.renderer.render(document_ir)
emit('stage', {'stage': 'html_rendered', 'html_length': len(html_report)})
self.state.html_content = html_report
self.state.mark_completed()
saved_files = {}
if save_report:
saved_files = self._save_report(html_report, document_ir, report_id)
emit('stage', {'stage': 'report_saved', 'files': saved_files})
generation_time = (datetime.now() - start_time).total_seconds()
self.state.metadata.generation_time = generation_time
logger.info(f"报告生成完成,耗时: {generation_time:.2f} 秒")
emit('metrics', {'generation_seconds': generation_time})
return {
'html_content': html_report,
'report_id': report_id,
**saved_files
}
except Exception as e:
self.state.mark_failed(str(e))
logger.exception(f"报告生成过程中发生错误: {str(e)}")
emit('error', {'stage': 'agent_failed', 'message': str(e)})
raise
def _select_template(self, query: str, reports: List[Any], forum_logs: str, custom_template: str):
"""
选择报告模板。
优先使用用户指定的模板;否则将查询、三引擎报告与论坛日志
作为上下文交给 TemplateSelectionNode,由 LLM 返回最契合的
模板名称、内容及理由,并自动记录在状态中。
参数:
query: 报告主题,用于提示词聚焦行业/事件。
reports: 多来源报告原文,帮助LLM判断结构复杂度。
forum_logs: 对应论坛或协作讨论的文本,用于补充背景。
custom_template: CLI/前端传入的自定义Markdown模板,非空时直接采用。
返回:
dict: 包含 `template_name`、`template_content` 与 `selection_reason` 的结构化结果,供后续节点消费。
"""
logger.info("选择报告模板...")
# 如果用户提供了自定义模板,直接使用
if custom_template:
logger.info("使用用户自定义模板")
return {
'template_name': 'custom',
'template_content': custom_template,
'selection_reason': '用户指定的自定义模板'
}
template_input = {
'query': query,
'reports': reports,
'forum_logs': forum_logs
}
try:
template_result = self.template_selection_node.run(template_input)
# 更新状态
self.state.metadata.template_used = template_result['template_name']
logger.info(f"选择模板: {template_result['template_name']}")
logger.info(f"选择理由: {template_result['selection_reason']}")
return template_result
except Exception as e:
logger.error(f"模板选择失败,使用默认模板: {str(e)}")
# 直接使用备用模板
fallback_template = {
'template_name': '社会公共热点事件分析报告模板',
'template_content': self._get_fallback_template_content(),
'selection_reason': '模板选择失败,使用默认社会热点事件分析模板'
}
self.state.metadata.template_used = fallback_template['template_name']
return fallback_template
def _slice_template(self, template_markdown: str) -> List[TemplateSection]:
"""
将模板切成章节列表,若为空则提供fallback。
委托 `parse_template_sections` 将Markdown标题/编号解析为
`TemplateSection` 列表,确保后续章节生成有稳定的章节ID。
当模板格式异常时,会回退到内置的简单骨架避免崩溃。
参数:
template_markdown: 完整的模板Markdown文本。
返回:
list[TemplateSection]: 解析后的章节序列;如解析失败则返回单章兜底结构。
"""
sections = parse_template_sections(template_markdown)
if sections:
return sections
logger.warning("模板未解析出章节,使用默认章节骨架")
fallback = TemplateSection(
title="1.0 综合分析",
slug="section-1-0",
order=10,
depth=1,
raw_title="1.0 综合分析",
number="1.0",
chapter_id="S1",
outline=["1.1 摘要", "1.2 数据亮点", "1.3 风险提示"],
)
return [fallback]
def _build_generation_context(
self,
query: str,
reports: Dict[str, str],
forum_logs: str,
template_result: Dict[str, Any],
layout_design: Dict[str, Any],
chapter_directives: Dict[str, Any],
word_plan: Dict[str, Any],
template_overview: Dict[str, Any],
) -> Dict[str, Any]:
"""
构造章节生成所需的共享上下文。
将模板名称、布局设计、主题配色、篇幅规划、论坛日志等
一次性整合为 `generation_context`,后续每章调用 LLM 时
直接复用,确保所有章节共享一致的语调和视觉约束。
参数:
query: 用户查询词。
reports: 归一化后的 query/media/insight 报告映射。
forum_logs: 三引擎讨论记录。
template_result: 模板节点返回的模板元信息。
layout_design: 文档布局节点产出的标题/目录/主题设计。
chapter_directives: 字数规划节点返回的章节指令映射。
word_plan: 篇幅规划原始结果,包含全局字数约束。
template_overview: 模板切片提炼的章节骨架摘要。
返回:
dict: LLM章节生成所需的全集上下文,包含主题色、布局、约束等键。
"""
# 优先使用设计稿定制的主题色,否则退回默认主题
theme_tokens = (
layout_design.get("themeTokens")
if layout_design else None
) or self._default_theme_tokens()
return {
"query": query,
"template_name": template_result.get("template_name"),
"reports": reports,
"forum_logs": self._stringify(forum_logs),
"theme_tokens": theme_tokens,
"style_directives": {
"tone": "analytical",
"audience": "executive",
"language": "zh-CN",
},
"data_bundles": [],
"max_tokens": min(self.config.MAX_CONTENT_LENGTH, 6000),
"layout": layout_design or {},
"template_overview": template_overview or {},
"chapter_directives": chapter_directives or {},
"word_plan": word_plan or {},
}
def _normalize_reports(self, reports: List[Any]) -> Dict[str, str]:
"""
将不同来源的报告统一转为字符串。
约定顺序为 Query/Media/Insight,引擎提供的对象可能是
字典或自定义类型,因此统一走 `_stringify` 做容错。
参数:
reports: 任意类型的报告列表,允许缺失或顺序混乱。
返回:
dict: 包含 `query_engine`/`media_engine`/`insight_engine` 三个字符串字段的映射。
"""
keys = ["query_engine", "media_engine", "insight_engine"]
normalized: Dict[str, str] = {}
for idx, key in enumerate(keys):
value = reports[idx] if idx < len(reports) else ""
normalized[key] = self._stringify(value)
return normalized
def _should_retry_inappropriate_content_error(self, error: Exception) -> bool:
"""
判断LLM异常是否由内容安全/不当内容导致。
当检测到供应商返回的错误包含特定关键词时,允许章节生成
重新尝试,以便绕过偶发的内容审查触发。
参数:
error: LLM客户端抛出的异常对象。
返回:
bool: 若匹配到内容审查关键词则返回True,否则为False。
"""
message = str(error) if error else ""
if not message:
return False
normalized = message.lower()
keywords = [
"inappropriate content",
"content violation",
"content moderation",
"model-studio/error-code",
]
return any(keyword in normalized for keyword in keywords)
def _run_stage_with_retry(
self,
stage_name: str,
fn: Callable[[], Any],
expected_keys: Optional[List[str]] = None,
postprocess: Optional[Callable[[Dict[str, Any], str], Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""
运行单个LLM阶段并在结构异常时有限次重试。
该方法只针对结构类错误做本地修复/重试,避免整个Agent重启。
"""
last_error: Optional[Exception] = None
for attempt in range(1, self._STRUCTURAL_RETRY_ATTEMPTS + 1):
try:
raw_result = fn()
result = self._ensure_mapping(raw_result, stage_name, expected_keys)
if postprocess:
result = postprocess(result, stage_name)
return result
except StageOutputFormatError as exc:
last_error = exc
logger.warning(
"{stage} 输出结构异常(第 {attempt}/{total} 次),将尝试修复或重试: {error}",
stage=stage_name,
attempt=attempt,
total=self._STRUCTURAL_RETRY_ATTEMPTS,
error=exc,
)
if attempt >= self._STRUCTURAL_RETRY_ATTEMPTS:
break
raise last_error # type: ignore[misc]
def _ensure_mapping(
self,
value: Any,
context: str,
expected_keys: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
确保阶段输出为dict;若返回列表则尝试提取最佳匹配元素。
"""
if isinstance(value, dict):
return value
if isinstance(value, list):
candidates = [item for item in value if isinstance(item, dict)]
if candidates:
best = candidates[0]
if expected_keys:
candidates.sort(
key=lambda item: sum(1 for key in expected_keys if key in item),
reverse=True,
)
best = candidates[0]
logger.warning(
"{context} 返回列表,已自动提取包含最多预期键的元素继续执行",
context=context,
)
return best
raise StageOutputFormatError(f"{context} 返回列表但缺少可用的对象元素")
if value is None:
raise StageOutputFormatError(f"{context} 返回空结果")
raise StageOutputFormatError(
f"{context} 返回类型 {type(value).__name__},期望字典"
)
def _normalize_word_plan(self, word_plan: Dict[str, Any], stage_name: str) -> Dict[str, Any]:
"""
清洗篇幅规划结果,确保 chapters/globalGuidelines/totalWords 类型安全。
"""
raw_chapters = word_plan.get("chapters", [])
if isinstance(raw_chapters, dict):
chapters_iterable = raw_chapters.values()
elif isinstance(raw_chapters, list):
chapters_iterable = raw_chapters
else:
chapters_iterable = []
normalized: List[Dict[str, Any]] = []
for idx, entry in enumerate(chapters_iterable):
if isinstance(entry, dict):
normalized.append(entry)
continue
if isinstance(entry, list):
dict_candidate = next((item for item in entry if isinstance(item, dict)), None)
if dict_candidate:
logger.warning(
"{stage} 第 {idx} 个章节条目为列表,已提取首个对象用于后续流程",
stage=stage_name,
idx=idx + 1,
)
normalized.append(dict_candidate)
continue
logger.warning(
"{stage} 跳过无法解析的章节条目#{idx}(类型: {type_name})",
stage=stage_name,
idx=idx + 1,
type_name=type(entry).__name__,
)
if not normalized:
raise StageOutputFormatError(f"{stage_name} 缺少有效的章节规划,无法继续")
word_plan["chapters"] = normalized
guidelines = word_plan.get("globalGuidelines")
if not isinstance(guidelines, list):
if guidelines is None or guidelines == "":
word_plan["globalGuidelines"] = []
else:
logger.warning(
"{stage} globalGuidelines 类型异常,已转换为列表封装",
stage=stage_name,
)
word_plan["globalGuidelines"] = [guidelines]
if not isinstance(word_plan.get("totalWords"), (int, float)):
logger.warning(
"{stage} totalWords 类型异常,使用默认值 10000",
stage=stage_name,
)
word_plan["totalWords"] = 10000
return word_plan
def _finalize_sparse_chapter(self, chapter: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
构造内容稀疏兜底章节:复制原始payload并插入温馨提示段落。
"""
safe_chapter = deepcopy(chapter or {})
if not isinstance(safe_chapter, dict):
safe_chapter = {}
self._ensure_sparse_warning_block(safe_chapter)
return safe_chapter
def _ensure_sparse_warning_block(self, chapter: Dict[str, Any]) -> None:
"""
将提示段落插在章节标题后,提醒读者该章字数偏少。
"""
warning_block = {
"type": "paragraph",
"inlines": [
{
"text": self._CONTENT_SPARSE_WARNING_TEXT,
"marks": [{"type": "italic"}],
}
],
"meta": {"role": "content-sparse-warning"},
}
blocks = chapter.get("blocks")
if isinstance(blocks, list) and blocks:
inserted = False
for idx, block in enumerate(blocks):
if isinstance(block, dict) and block.get("type") == "heading":
blocks.insert(idx + 1, warning_block)
inserted = True
break
if not inserted:
blocks.insert(0, warning_block)
else:
chapter["blocks"] = [warning_block]
meta = chapter.get("meta")
if isinstance(meta, dict):
meta["contentSparseWarning"] = True
else:
chapter["meta"] = {"contentSparseWarning": True}
def _stringify(self, value: Any) -> str:
"""
安全地将对象转成字符串。
- dict/list 统一序列化为格式化 JSON,便于提示词消费;
- 其他类型走 `str()`,None 则返回空串,避免 None 传播。
参数:
value: 任意Python对象。
返回:
str: 适配提示词/日志的字符串表现。
"""
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (dict, list)):
try:
return json.dumps(value, ensure_ascii=False, indent=2)
except Exception:
return str(value)
return str(value)
def _default_theme_tokens(self) -> Dict[str, Any]:
"""
构造默认主题变量,供渲染器/LLM共用。
当布局节点未返回专属配色时使用该套色板,保持报告风格统一。
返回:
dict: 包含颜色、字体、间距、布尔开关等渲染参数的主题字典。
"""
return {
"colors": {
"bg": "#f8f9fa",
"text": "#212529",
"primary": "#007bff",
"secondary": "#6c757d",
"card": "#ffffff",
"border": "#dee2e6",
"accent1": "#17a2b8",
"accent2": "#28a745",
"accent3": "#ffc107",
"accent4": "#dc3545",
},
"fonts": {
"body": "-apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, 'Noto Sans', sans-serif",
"heading": "'Source Han Sans SC', 'PingFang SC', 'Microsoft YaHei', sans-serif",
},
"spacing": {"container": "1200px", "gutter": "24px"},
"vars": {
"header_sticky": True,
"toc_depth": 3,
"enable_dark_mode": True,
},
}
def _build_template_overview(
self,
template_markdown: str,
sections: List[TemplateSection],
) -> Dict[str, Any]:
"""
提取模板标题与章节骨架,供设计/篇幅规划统一引用。
同时记录章节ID/slug/order等辅助字段,保证多节点对齐。
参数:
template_markdown: 模板原文,用于解析全局标题。
sections: `TemplateSection` 列表,作为章节骨架。
返回:
dict: 包含模板标题与章节元数据的概览结构。
"""
fallback_title = sections[0].title if sections else ""
overview = {
"title": self._extract_template_title(template_markdown, fallback_title),
"chapters": [],
}
for section in sections:
overview["chapters"].append(
{
"chapterId": section.chapter_id,
"title": section.title,
"rawTitle": section.raw_title,
"number": section.number,
"slug": section.slug,
"order": section.order,
"depth": section.depth,
"outline": section.outline,
}
)
return overview
@staticmethod
def _extract_template_title(template_markdown: str, fallback: str = "") -> str:
"""
尝试从Markdown中提取首个标题。
优先返回首个 `#` 语法标题;如果模板首行就是正文,则回退到
第一行非空文本或调用方提供的 fallback。
参数:
template_markdown: 模板原文。
fallback: 备用标题,当文档缺少显式标题时使用。
返回:
str: 解析到的标题文本。
"""
for line in template_markdown.splitlines():
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("#"):
return stripped.lstrip("#").strip()
if stripped:
fallback = fallback or stripped
return fallback or "智能舆情分析报告"
def _get_fallback_template_content(self) -> str:
"""
获取备用模板内容。
当模板目录不可用或LLM选择失败时使用该 Markdown 模板,
保证后续流程仍能给出结构化章节。
"""
return """# 社会公共热点事件分析报告
## 执行摘要
本报告针对当前社会热点事件进行综合分析,整合了多方信息源的观点和数据。
## 事件概况
### 基本信息
- 事件性质:{event_nature}
- 发生时间:{event_time}
- 涉及范围:{event_scope}
## 舆情态势分析
### 整体趋势
{sentiment_analysis}
### 主要观点分布
{opinion_distribution}
## 媒体报道分析
### 主流媒体态度
{media_analysis}
### 报道重点
{report_focus}
## 社会影响评估
### 直接影响
{direct_impact}
### 潜在影响
{potential_impact}
## 应对建议
### 即时措施
{immediate_actions}
### 长期策略
{long_term_strategy}
## 结论与展望
{conclusion}
---
*报告类型:社会公共热点事件分析*
*生成时间:{generation_time}*
"""
def _save_report(self, html_content: str, document_ir: Dict[str, Any], report_id: str) -> Dict[str, Any]:
"""
保存HTML与IR到文件并返回路径信息。
生成基于查询和时间戳的易读文件名,同时也把运行态的
`ReportState` 写入 JSON,方便下游排障或断点续跑。
参数:
html_content: 渲染后的HTML正文。
document_ir: Document IR结构化数据。
report_id: 当前任务ID,用于创建独立文件名。
返回:
dict: 记录HTML/IR/State文件的绝对与相对路径信息。
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
query_safe = "".join(
c for c in self.state.metadata.query if c.isalnum() or c in (" ", "-", "_")
).rstrip()
query_safe = query_safe.replace(" ", "_")[:30] or "report"
html_filename = f"final_report_{query_safe}_{timestamp}.html"
html_path = Path(self.config.OUTPUT_DIR) / html_filename
html_path.write_text(html_content, encoding="utf-8")
html_abs = str(html_path.resolve())
html_rel = os.path.relpath(html_abs, os.getcwd())
ir_path = self._save_document_ir(document_ir, query_safe, timestamp)
ir_abs = str(ir_path.resolve())
ir_rel = os.path.relpath(ir_abs, os.getcwd())
state_filename = f"report_state_{query_safe}_{timestamp}.json"
state_path = Path(self.config.OUTPUT_DIR) / state_filename
self.state.save_to_file(str(state_path))
state_abs = str(state_path.resolve())
state_rel = os.path.relpath(state_abs, os.getcwd())
logger.info(f"HTML报告已保存: {html_path}")
logger.info(f"Document IR已保存: {ir_path}")
logger.info(f"状态已保存到: {state_path}")
return {
'report_filename': html_filename,
'report_filepath': html_abs,
'report_relative_path': html_rel,
'ir_filename': ir_path.name,
'ir_filepath': ir_abs,
'ir_relative_path': ir_rel,
'state_filename': state_filename,
'state_filepath': state_abs,
'state_relative_path': state_rel,
}
def _save_document_ir(self, document_ir: Dict[str, Any], query_safe: str, timestamp: str) -> Path:
"""
将整本IR写入独立目录。
`Document IR` 与 HTML 解耦保存,便于调试渲染差异以及
在不重新跑 LLM 的情况下再次渲染或导出其他格式。
参数:
document_ir: 整本报告的IR结构。
query_safe: 已清洗的查询短语,用于文件命名。
timestamp: 运行时间戳,保证文件名唯一。
返回:
Path: 指向保存后的IR文件路径。
"""
filename = f"report_ir_{query_safe}_{timestamp}.json"
ir_path = Path(self.config.DOCUMENT_IR_OUTPUT_DIR) / filename
ir_path.write_text(
json.dumps(document_ir, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return ir_path
def _persist_planning_artifacts(
self,
run_dir: Path,
layout_design: Dict[str, Any],
word_plan: Dict[str, Any],
template_overview: Dict[str, Any],
):
"""
将文档设计稿、篇幅规划与模板概览另存成JSON。
这些中间件文件(document_layout/word_plan/template_overview)
方便在调试或复盘时快速定位:标题/目录/主题是如何确定的、
字数分配有什么要求,以便后续人工校正。
参数:
run_dir: 章节输出根目录。
layout_design: 文档布局节点的原始输出。
word_plan: 篇幅规划节点输出。
template_overview: 模板概览JSON。
"""
artifacts = {
"document_layout": layout_design,
"word_plan": word_plan,
"template_overview": template_overview,
}
for name, payload in artifacts.items():
if not payload:
continue
path = run_dir / f"{name}.json"
try:
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception as exc:
logger.warning(f"写入{name}失败: {exc}")
def get_progress_summary(self) -> Dict[str, Any]:
"""获取进度摘要,直接返回可序列化的状态字典供API层查询。"""
return self.state.to_dict()
def load_state(self, filepath: str):
"""从文件加载状态并覆盖当前state,便于断点恢复。"""
self.state = ReportState.load_from_file(filepath)
logger.info(f"状态已从 {filepath} 加载")
def save_state(self, filepath: str):
"""保存状态到文件,通常用于任务完成后的分析与备份。"""
self.state.save_to_file(filepath)
logger.info(f"状态已保存到 {filepath}")
def check_input_files(self, insight_dir: str, media_dir: str, query_dir: str, forum_log_path: str) -> Dict[str, Any]:
"""
检查输入文件是否准备就绪(基于文件数量增加)。
Args:
insight_dir: InsightEngine报告目录
media_dir: MediaEngine报告目录
query_dir: QueryEngine报告目录
forum_log_path: 论坛日志文件路径
Returns:
检查结果字典,包含文件计数、缺失列表、最新文件路径等
"""
# 检查各个报告目录的文件数量变化
directories = {
'insight': insight_dir,
'media': media_dir,
'query': query_dir
}
# 使用文件基准管理器检查新文件
check_result = self.file_baseline.check_new_files(directories)
# 检查论坛日志
forum_ready = os.path.exists(forum_log_path)
# 构建返回结果
result = {
'ready': check_result['ready'] and forum_ready,
'baseline_counts': check_result['baseline_counts'],
'current_counts': check_result['current_counts'],
'new_files_found': check_result['new_files_found'],
'missing_files': [],
'files_found': [],
'latest_files': {}
}
# 构建详细信息
for engine, new_count in check_result['new_files_found'].items():
current_count = check_result['current_counts'][engine]
baseline_count = check_result['baseline_counts'].get(engine, 0)
if new_count > 0:
result['files_found'].append(f"{engine}: {current_count}个文件 (新增{new_count}个)")
else:
result['missing_files'].append(f"{engine}: {current_count}个文件 (基准{baseline_count}个,无新增)")
# 检查论坛日志
if forum_ready:
result['files_found'].append(f"forum: {os.path.basename(forum_log_path)}")
else:
result['missing_files'].append("forum: 日志文件不存在")
# 获取最新文件路径(用于实际报告生成)
if result['ready']:
result['latest_files'] = self.file_baseline.get_latest_files(directories)
if forum_ready:
result['latest_files']['forum'] = forum_log_path
return result
def load_input_files(self, file_paths: Dict[str, str]) -> Dict[str, Any]:
"""
加载输入文件内容
Args:
file_paths: 文件路径字典
Returns:
加载的内容字典,包含 `reports` 列表与 `forum_logs` 字符串
"""
content = {
'reports': [],
'forum_logs': ''
}
# 加载报告文件
engines = ['query', 'media', 'insight']
for engine in engines:
if engine in file_paths:
try:
with open(file_paths[engine], 'r', encoding='utf-8') as f:
report_content = f.read()
content['reports'].append(report_content)
logger.info(f"已加载 {engine} 报告: {len(report_content)} 字符")
except Exception as e:
logger.exception(f"加载 {engine} 报告失败: {str(e)}")
content['reports'].append("")
# 加载论坛日志
if 'forum' in file_paths:
try:
with open(file_paths['forum'], 'r', encoding='utf-8') as f:
content['forum_logs'] = f.read()
logger.info(f"已加载论坛日志: {len(content['forum_logs'])} 字符")
except Exception as e:
logger.exception(f"加载论坛日志失败: {str(e)}")
return content
def create_agent(config_file: Optional[str] = None) -> ReportAgent:
"""
创建Report Agent实例的便捷函数。
Args:
config_file: 配置文件路径
Returns:
ReportAgent实例
目前以环境变量驱动 `Settings`,保留 `config_file` 参数便于未来扩展。
"""
config = Settings() # 以空配置初始化,而从从环境变量初始化
return ReportAgent(config)