test_monitor.py
16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
"""
测试论坛日志监控模块中的日志解析函数
测试各种日志格式下的解析能力,包括:
1. 旧格式:[HH:MM:SS]
2. 新格式:loguru默认格式 (YYYY-MM-DD HH:mm:ss.SSS | LEVEL | ...)
3. 只应当接收FirstSummaryNode、ReflectionSummaryNode等SummaryNode的输出,不应当接收SearchNode的输出
"""
import sys
from tempfile import TemporaryDirectory
from pathlib import Path
# 添加项目根目录到路径
project_root = Path(__file__).resolve().parents[3]
sys.path.insert(0, str(project_root))
from services.engines.forum.monitor import LogMonitor
from tests.fixtures import forum_log_test_data as test_data
class TestLogMonitor:
"""测试LogMonitor的日志解析功能"""
def setup_method(self):
"""每个测试方法前的初始化"""
self._temp_dir = TemporaryDirectory(prefix="forum-monitor-tests-")
self.monitor = LogMonitor(log_dir=self._temp_dir.name)
def teardown_method(self):
"""每个测试方法后的清理。"""
self._temp_dir.cleanup()
def test_is_target_log_line_old_format(self):
"""测试旧格式的目标节点识别"""
# 应该识别包含FirstSummaryNode的行
assert self.monitor.is_target_log_line(test_data.OLD_FORMAT_FIRST_SUMMARY) is True
# 应该识别包含ReflectionSummaryNode的行
assert (
self.monitor.is_target_log_line(test_data.OLD_FORMAT_REFLECTION_SUMMARY)
is True
)
# 不应该识别非目标节点
assert self.monitor.is_target_log_line(test_data.OLD_FORMAT_NON_TARGET) is False
def test_is_target_log_line_new_format(self):
"""测试新格式的目标节点识别"""
# 应该识别包含FirstSummaryNode的行
assert self.monitor.is_target_log_line(test_data.NEW_FORMAT_FIRST_SUMMARY) is True
# 应该识别包含ReflectionSummaryNode的行
assert (
self.monitor.is_target_log_line(test_data.NEW_FORMAT_REFLECTION_SUMMARY)
is True
)
# 不应该识别非目标节点
assert self.monitor.is_target_log_line(test_data.NEW_FORMAT_NON_TARGET) is False
def test_is_json_start_line_old_format(self):
"""测试旧格式的JSON开始行识别"""
assert self.monitor.is_json_start_line(test_data.OLD_FORMAT_SINGLE_LINE_JSON) is True
assert (
self.monitor.is_json_start_line(test_data.OLD_FORMAT_MULTILINE_JSON[0])
is True
)
assert self.monitor.is_json_start_line(test_data.OLD_FORMAT_NON_TARGET) is False
def test_is_json_start_line_new_format(self):
"""测试新格式的JSON开始行识别"""
assert self.monitor.is_json_start_line(test_data.NEW_FORMAT_SINGLE_LINE_JSON) is True
assert (
self.monitor.is_json_start_line(test_data.NEW_FORMAT_MULTILINE_JSON[0])
is True
)
assert self.monitor.is_json_start_line(test_data.NEW_FORMAT_NON_TARGET) is False
def test_is_json_end_line(self):
"""测试JSON结束行识别"""
assert self.monitor.is_json_end_line("}") is True
assert self.monitor.is_json_end_line("] }") is True
assert self.monitor.is_json_end_line("[17:42:31] }") is False # 需要先清理时间戳
assert (
self.monitor.is_json_end_line(
"2025-11-05 17:42:31.289 | INFO | module:function:133 - }"
)
is False
) # 需要先清理时间戳
def test_extract_json_content_old_format_single_line(self):
"""测试旧格式单行JSON提取"""
lines = [test_data.OLD_FORMAT_SINGLE_LINE_JSON]
result = self.monitor.extract_json_content(lines)
assert result is not None
assert "这是首次总结内容" in result
def test_extract_json_content_new_format_single_line(self):
"""测试新格式单行JSON提取"""
lines = [test_data.NEW_FORMAT_SINGLE_LINE_JSON]
result = self.monitor.extract_json_content(lines)
assert result is not None
assert "这是首次总结内容" in result
def test_extract_json_content_old_format_multiline(self):
"""测试旧格式多行JSON提取"""
result = self.monitor.extract_json_content(test_data.OLD_FORMAT_MULTILINE_JSON)
assert result is not None
assert "多行" in result
assert "JSON内容" in result
def test_extract_json_content_new_format_multiline(self):
"""测试新格式多行JSON提取(支持loguru格式的时间戳移除)"""
result = self.monitor.extract_json_content(test_data.NEW_FORMAT_MULTILINE_JSON)
assert result is not None
assert "多行" in result
assert "JSON内容" in result
def test_extract_json_content_updated_priority(self):
"""测试updated_paragraph_latest_state优先提取"""
result = self.monitor.extract_json_content(test_data.COMPLEX_JSON_WITH_UPDATED)
assert result is not None
assert "更新版" in result
assert "核心发现" in result
def test_extract_json_content_paragraph_only(self):
"""测试只有paragraph_latest_state的情况"""
result = self.monitor.extract_json_content(test_data.COMPLEX_JSON_WITH_PARAGRAPH)
assert result is not None
assert "首次总结" in result or "核心发现" in result
def test_format_json_content(self):
"""测试JSON内容格式化"""
# 测试updated_paragraph_latest_state优先
json_obj = {
"updated_paragraph_latest_state": "更新后的内容",
"paragraph_latest_state": "首次内容"
}
result = self.monitor.format_json_content(json_obj)
assert result == "更新后的内容"
# 测试只有paragraph_latest_state
json_obj = {
"paragraph_latest_state": "首次内容"
}
result = self.monitor.format_json_content(json_obj)
assert result == "首次内容"
# 测试都没有的情况
json_obj = {"other_field": "其他内容"}
result = self.monitor.format_json_content(json_obj)
assert "清理后的输出" in result
def test_extract_node_content_old_format(self):
"""测试旧格式的节点内容提取"""
line = "[17:42:31] [INSIGHT] [FirstSummaryNode] 清理后的输出: 这是测试内容"
result = self.monitor.extract_node_content(line)
assert result is not None
assert "测试内容" in result
def test_extract_node_content_new_format(self):
"""测试新格式的节点内容提取"""
line = "2025-11-05 17:42:31.287 | INFO | InsightEngine.nodes.summary_node:process_output:131 - FirstSummaryNode 清理后的输出: 这是测试内容"
result = self.monitor.extract_node_content(line)
assert result is not None
assert "测试内容" in result
def test_process_lines_for_json_old_format(self):
"""测试旧格式的完整处理流程"""
lines = [
test_data.OLD_FORMAT_NON_TARGET, # 应该被忽略
test_data.OLD_FORMAT_MULTILINE_JSON[0],
test_data.OLD_FORMAT_MULTILINE_JSON[1],
test_data.OLD_FORMAT_MULTILINE_JSON[2],
]
result = self.monitor.process_lines_for_json(lines, "insight")
assert len(result) > 0
assert any("多行" in content for content in result)
def test_process_lines_for_json_new_format(self):
"""测试新格式的完整处理流程"""
lines = [
test_data.NEW_FORMAT_NON_TARGET, # 应该被忽略
test_data.NEW_FORMAT_MULTILINE_JSON[0],
test_data.NEW_FORMAT_MULTILINE_JSON[1],
test_data.NEW_FORMAT_MULTILINE_JSON[2],
]
result = self.monitor.process_lines_for_json(lines, "insight")
assert len(result) > 0
assert any("多行" in content for content in result)
assert any("JSON内容" in content for content in result)
def test_process_lines_for_json_mixed_format(self):
"""测试混合格式的处理"""
result = self.monitor.process_lines_for_json(test_data.MIXED_FORMAT_LINES, "insight")
assert len(result) > 0
assert any("混合格式内容" in content for content in result)
def test_is_valuable_content(self):
"""测试有价值内容的判断"""
# 包含"清理后的输出"应该是有价值的
assert self.monitor.is_valuable_content(test_data.OLD_FORMAT_SINGLE_LINE_JSON) is True
# 排除短小提示信息
assert self.monitor.is_valuable_content("JSON解析成功") is False
assert self.monitor.is_valuable_content("成功生成") is False
# 空行应该被过滤
assert self.monitor.is_valuable_content("") is False
def test_extract_json_content_real_query_engine(self):
"""测试QueryEngine实际生产环境日志提取"""
result = self.monitor.extract_json_content(test_data.REAL_QUERY_ENGINE_REFLECTION)
assert result is not None
assert "洛阳栾川钼业集团" in result
assert "CMOC" in result
assert "updated_paragraph_latest_state" not in result # 应该已经提取内容,不包含字段名
def test_extract_json_content_real_insight_engine(self):
"""测试InsightEngine实际生产环境日志提取(包含标识行)"""
# 先测试能否识别标识行
assert self.monitor.is_target_log_line(test_data.REAL_INSIGHT_ENGINE_REFLECTION[0]) is True # 包含"正在生成反思总结"
assert self.monitor.is_target_log_line(test_data.REAL_INSIGHT_ENGINE_REFLECTION[1]) is True # 包含nodes.summary_node
# 测试JSON提取(从第二行开始,因为第一行是标识行)
json_lines = test_data.REAL_INSIGHT_ENGINE_REFLECTION[1:] # 跳过标识行
result = self.monitor.extract_json_content(json_lines)
assert result is not None
assert "核心发现" in result
assert "更新版" in result
assert "洛阳钼业2025年第三季度" in result
def test_extract_json_content_real_media_engine(self):
"""测试MediaEngine实际生产环境日志提取(单行JSON)"""
# MediaEngine是单行JSON格式,需要先分割成行
lines = test_data.REAL_MEDIA_ENGINE_REFLECTION.split("\n")
# 测试能否识别标识行
assert self.monitor.is_target_log_line(lines[0]) is True # 包含"正在生成反思总结"
assert self.monitor.is_target_log_line(lines[1]) is True # 包含nodes.summary_node和"清理后的输出"
# 测试JSON提取(从包含JSON的行开始)
json_line = lines[1] # 第二行包含完整的单行JSON
result = self.monitor.extract_json_content([json_line])
assert result is not None
assert "综合信息概览" in result
assert "洛阳钼业" in result
assert "updated_paragraph_latest_state" not in result # 应该已经提取内容
def test_process_lines_for_json_real_query_engine(self):
"""测试QueryEngine实际日志的完整处理流程"""
result = self.monitor.process_lines_for_json(test_data.REAL_QUERY_ENGINE_REFLECTION, "query")
assert len(result) > 0
assert any("洛阳栾川钼业集团" in content for content in result)
def test_process_lines_for_json_real_insight_engine(self):
"""测试InsightEngine实际日志的完整处理流程(包含标识行)"""
result = self.monitor.process_lines_for_json(test_data.REAL_INSIGHT_ENGINE_REFLECTION, "insight")
assert len(result) > 0
assert any("核心发现" in content for content in result)
assert any("更新版" in content for content in result)
def test_process_lines_for_json_real_media_engine(self):
"""测试MediaEngine实际日志的完整处理流程(单行JSON)"""
# 将单行字符串分割成多行
lines = test_data.REAL_MEDIA_ENGINE_REFLECTION.split('\n')
result = self.monitor.process_lines_for_json(lines, "media")
assert len(result) > 0
assert any("综合信息概览" in content for content in result)
assert any("洛阳钼业" in content for content in result)
def test_filter_search_node_output(self):
"""测试过滤SearchNode的输出(重要:SearchNode不应进入论坛)"""
# SearchNode的输出包含"清理后的输出: {",但不包含目标节点模式
search_lines = test_data.SEARCH_NODE_FIRST_SEARCH
result = self.monitor.process_lines_for_json(search_lines, "insight")
# SearchNode的输出应该被过滤,不应该被捕获
assert len(result) == 0
def test_filter_search_node_output_single_line(self):
"""测试过滤SearchNode的单行JSON输出"""
# SearchNode的单行JSON格式
search_line = test_data.SEARCH_NODE_REFLECTION_SEARCH
result = self.monitor.process_lines_for_json([search_line], "insight")
# SearchNode的输出应该被过滤
assert len(result) == 0
def test_search_node_vs_summary_node_mixed(self):
"""测试混合场景:SearchNode和SummaryNode同时存在,只捕获SummaryNode"""
lines = [
# SearchNode输出(应该被过滤)
"[11:16:35] 2025-11-06 11:16:35.567 | INFO | InsightEngine.nodes.search_node:process_output:97 - 清理后的输出: {",
"[11:16:35] \"search_query\": \"测试查询\"",
"[11:16:35] }",
# SummaryNode输出(应该被捕获)
"[11:17:05] 2025-11-06 11:17:05.547 | INFO | InsightEngine.nodes.summary_node:process_output:131 - 清理后的输出: {",
"[11:17:05] \"paragraph_latest_state\": \"这是总结内容\"",
"[11:17:05] }",
]
result = self.monitor.process_lines_for_json(lines, "insight")
# 应该只捕获SummaryNode的输出,不包含SearchNode的输出
assert len(result) > 0
assert any("总结内容" in content for content in result)
# 确保不包含搜索查询内容
assert not any("search_query" in content for content in result)
assert not any("测试查询" in content for content in result)
def test_filter_error_logs_from_summary_node(self):
"""测试过滤SummaryNode的错误日志(重要:错误日志不应进入论坛)"""
# JSON解析失败错误日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_JSON_ERROR) == False
# JSON修复失败错误日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_JSON_FIX_ERROR) == False
# ERROR级别日志
assert self.monitor.is_target_log_line(test_data.SUMMARY_NODE_ERROR_LOG) == False
# Traceback错误日志
for line in test_data.SUMMARY_NODE_TRACEBACK.split('\n'):
assert self.monitor.is_target_log_line(line) == False
def test_error_logs_not_captured(self):
"""测试错误日志不会被捕获到论坛"""
error_lines = [
test_data.SUMMARY_NODE_JSON_ERROR,
test_data.SUMMARY_NODE_JSON_FIX_ERROR,
test_data.SUMMARY_NODE_ERROR_LOG,
]
for line in error_lines:
result = self.monitor.process_lines_for_json([line], "media")
# 错误日志不应该被捕获
assert len(result) == 0
def test_mixed_valid_and_error_logs(self):
"""测试混合场景:有效日志和错误日志同时存在,只捕获有效日志"""
lines = [
# 错误日志(应该被过滤)
test_data.SUMMARY_NODE_JSON_ERROR,
test_data.SUMMARY_NODE_JSON_FIX_ERROR,
# 有效SummaryNode输出(应该被捕获)
"[11:55:31] 2025-11-06 11:55:31.762 | INFO | MediaEngine.nodes.summary_node:process_output:134 - 清理后的输出: {",
"[11:55:31] \"paragraph_latest_state\": \"这是有效的总结内容\"",
"[11:55:31] }",
]
result = self.monitor.process_lines_for_json(lines, "media")
# 应该只捕获有效日志,不包含错误日志
assert len(result) > 0
assert any("有效的总结内容" in content for content in result)
# 确保不包含错误信息
assert not any("JSON解析失败" in content for content in result)
assert not any("JSON修复失败" in content for content in result)
def run_tests():
"""运行所有测试"""
import pytest
# 运行测试
pytest.main([__file__, "-v"])
if __name__ == "__main__":
run_tests()