test_analysis_run_store.py
4.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import annotations
from datetime import datetime, timedelta
from pathlib import Path
from uuid import uuid4
from services.application.analysis.run_store import InMemoryAnalysisRunStore, JsonAnalysisRunStore
from services.shared.models import AnalysisRunStatus
def _make_store_path() -> Path:
workspace_temp_root = Path.cwd() / "var" / "test-artifacts"
workspace_temp_root.mkdir(parents=True, exist_ok=True)
return workspace_temp_root / f"analysis-run-store-{uuid4().hex}.json"
def test_json_analysis_run_store_round_trips_runs_across_instances():
store_path = _make_store_path()
try:
first_store = JsonAnalysisRunStore(store_path)
base_time = datetime(2026, 1, 1, 9, 0, 0)
older_run = first_store.create_run(
research_task_id="task-1",
query="museum research",
engines=["insight", "query"],
)
older_run.created_at = base_time
older_run.status = AnalysisRunStatus.PARTIAL
older_run.partial_results["insight"] = {"success": True, "message": "done"}
older_run.metrics["success_count"] = 1
older_run.metrics["failure_count"] = 1
older_run.legacy_payload["source"] = "test"
first_store.save_run(older_run)
newer_run = first_store.create_run(
research_task_id="task-1",
query="city research",
engines=["query"],
)
newer_run.created_at = base_time + timedelta(hours=2)
newer_run.status = AnalysisRunStatus.COMPLETED
newer_run.legacy_payload["source"] = "history"
first_store.save_run(newer_run)
unrelated_run = first_store.create_run(
research_task_id="task-2",
query="other task",
engines=["query"],
)
unrelated_run.created_at = base_time + timedelta(hours=1)
first_store.save_run(unrelated_run)
second_store = JsonAnalysisRunStore(store_path)
reloaded = second_store.get_run(older_run.id)
listed_runs = second_store.list_runs_for_task("task-1")
assert reloaded is not None
assert reloaded.id == older_run.id
assert reloaded.research_task_id == "task-1"
assert reloaded.status == AnalysisRunStatus.PARTIAL
assert reloaded.partial_results["insight"]["success"] is True
assert reloaded.metrics["success_count"] == 1
assert reloaded.metrics["failure_count"] == 1
assert reloaded.legacy_payload["source"] == "test"
assert [run.id for run in listed_runs] == [newer_run.id, older_run.id]
assert [run.legacy_payload["source"] for run in listed_runs] == ["history", "test"]
finally:
store_path.unlink(missing_ok=True)
def test_json_analysis_run_store_returns_none_for_missing_or_invalid_payload():
store_path = _make_store_path()
try:
store_path.write_text('{"runs": "broken"}', encoding="utf-8")
store = JsonAnalysisRunStore(store_path)
assert store.get_run("missing") is None
assert store.list_runs_for_task("missing-task") == []
finally:
store_path.unlink(missing_ok=True)
def test_in_memory_analysis_run_store_lists_runs_for_task_in_descending_created_order():
store = InMemoryAnalysisRunStore()
base_time = datetime(2026, 1, 1, 9, 0, 0)
older_run = store.create_run(
research_task_id="task-1",
query="museum research",
engines=["insight"],
)
older_run.created_at = base_time
older_run.legacy_payload["source"] = "older"
store.save_run(older_run)
newer_run = store.create_run(
research_task_id="task-1",
query="city research",
engines=["query"],
)
newer_run.created_at = base_time + timedelta(hours=1)
newer_run.legacy_payload["source"] = "newer"
store.save_run(newer_run)
other_task_run = store.create_run(
research_task_id="task-2",
query="other task",
engines=["query"],
)
other_task_run.created_at = base_time + timedelta(hours=2)
store.save_run(other_task_run)
listed_runs = store.list_runs_for_task("task-1")
assert [run.id for run in listed_runs] == [newer_run.id, older_run.id]
assert [run.legacy_payload["source"] for run in listed_runs] == ["newer", "older"]
def test_in_memory_analysis_run_store_returns_empty_list_for_missing_task():
store = InMemoryAnalysisRunStore()
assert store.list_runs_for_task("missing-task") == []