test_task_runtime_store.py
3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations
from apps.web_api.runtime.task_runtime_store import TaskRuntimeStore
from services.shared.models.common import ErrorInfo, ProgressInfo
def test_upsert_returns_clone_safe_snapshot():
store = TaskRuntimeStore()
snapshot = store.upsert(
"task-1",
status="analyzing",
generated_query="museum research",
analysis_run_id="analysis-1",
last_action="accepted",
progress=ProgressInfo(stage="queued", current=0, total=2, percent=0.0, message="accepted"),
engines=["query", "insight"],
partial_results={"query": {"success": True}},
metrics={"engine_count": 2},
)
snapshot.engines.append("mutated")
snapshot.partial_results["media"] = {"success": False}
stored = store.get_task("task-1")
assert stored is not None
assert stored.research_task_id == "task-1"
assert stored.status == "analyzing"
assert stored.generated_query == "museum research"
assert stored.analysis_run_id == "analysis-1"
assert stored.engines == ["query", "insight"]
assert stored.partial_results["query"]["engine_name"] == "query"
assert stored.partial_results["query"]["success"] is True
assert stored.partial_results["query"]["status"] == "completed"
assert stored.metrics == {"engine_count": 2}
def test_upsert_can_clear_one_linked_id_without_dropping_others():
store = TaskRuntimeStore()
store.upsert(
"task-2",
crawler_job_id="crawl-2",
analysis_run_id="analysis-2",
report_job_id="report-2",
)
updated = store.upsert(
"task-2",
analysis_run_id=None,
error=ErrorInfo(message="analysis failed", retryable=True),
)
assert updated.crawler_job_id == "crawl-2"
assert updated.analysis_run_id is None
assert updated.report_job_id == "report-2"
assert updated.error is not None
assert updated.error.message == "analysis failed"
def test_clear_task_and_snapshot_all_follow_store_state():
store = TaskRuntimeStore()
store.upsert("task-a", status="ready")
store.upsert("task-b", status="reporting")
snapshot_all = store.snapshot_all()
snapshot_all["task-a"].status = "mutated"
assert sorted(store.snapshot_all()) == ["task-a", "task-b"]
assert store.get_task("task-a").status == "ready"
assert store.clear_task("task-a") is True
assert store.clear_task("task-a") is False
assert list(store.snapshot_all()) == ["task-b"]
def test_upsert_normalizes_legacy_partial_results_to_engine_result_payloads():
store = TaskRuntimeStore()
snapshot = store.upsert(
"task-legacy",
partial_results={
"query": "timeout",
"insight": {
"success": True,
"summary": "insight complete",
},
},
)
assert snapshot.partial_results["query"]["engine_name"] == "query"
assert snapshot.partial_results["query"]["success"] is False
assert snapshot.partial_results["query"]["status"] == "failed"
assert snapshot.partial_results["query"]["error"]["message"] == "timeout"
assert snapshot.partial_results["insight"]["engine_name"] == "insight"
assert snapshot.partial_results["insight"]["success"] is True
assert snapshot.partial_results["insight"]["summary"] == "insight complete"