test_task_runtime.py
3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from __future__ import annotations
from datetime import datetime, timedelta
from services.engines.report.task_runtime import ReportTaskRuntimeStore
from services.shared.dto import ReportJobDTO
class _FakeTask:
def __init__(
self,
task_id: str,
*,
created_at: datetime,
updated_at: datetime | None = None,
status: str = "completed",
) -> None:
self.task_id = task_id
self.created_at = created_at
self.updated_at = updated_at or created_at
self.status = status
def to_dto(self) -> ReportJobDTO:
return ReportJobDTO(
id=self.task_id,
research_task_id=f"research-{self.task_id}",
status=self.status,
output_path=f"reports/{self.task_id}.html",
output_format="html",
artifacts={
"report_file_ready": self.status == "completed",
"report_file_path": f"reports/{self.task_id}.html",
},
last_action=f"task {self.task_id} {self.status}",
created_at=self.created_at.isoformat(),
updated_at=self.updated_at.isoformat(),
)
def test_get_task_prefers_current_task_over_registry_and_loader():
now = datetime(2026, 4, 23, 10, 0, 0)
current_task = _FakeTask("task-current", created_at=now, status="running")
loader_calls: list[str] = []
store = ReportTaskRuntimeStore[_FakeTask](
persisted_loader=lambda task_id: loader_calls.append(task_id) or None,
)
store.set_current_task(current_task)
store.register_task(_FakeTask("task-current", created_at=now - timedelta(minutes=5)), set_as_current=False)
resolved = store.get_task("task-current")
assert resolved is current_task
assert loader_calls == []
def test_get_task_loads_persisted_task_and_prunes_old_history():
base_time = datetime(2026, 4, 23, 9, 0, 0)
deleted_task_ids: list[str] = []
loaded_task = _FakeTask("task-3", created_at=base_time + timedelta(minutes=20))
store = ReportTaskRuntimeStore[_FakeTask](
max_history=2,
persisted_loader=lambda task_id: loaded_task if task_id == "task-3" else None,
persisted_deleter=deleted_task_ids.append,
)
store.replace_registry(
[
_FakeTask("task-1", created_at=base_time),
_FakeTask("task-2", created_at=base_time + timedelta(minutes=10)),
]
)
resolved = store.get_task("task-3")
assert resolved is loaded_task
assert [task.task_id for task in store.list_tasks()] == ["task-2", "task-3"]
assert deleted_task_ids == ["task-1"]
def test_snapshot_tasks_returns_current_and_orders_history_by_updated_at():
base_time = datetime(2026, 4, 23, 8, 0, 0)
task_1 = _FakeTask(
"task-1",
created_at=base_time,
updated_at=base_time + timedelta(minutes=5),
)
task_2 = _FakeTask(
"task-2",
created_at=base_time + timedelta(minutes=1),
updated_at=base_time + timedelta(minutes=15),
status="running",
)
store = ReportTaskRuntimeStore[_FakeTask]()
store.replace_registry([task_1, task_2], current_task=task_2)
snapshot = store.snapshot_tasks(limit=5)
assert snapshot["current_task"]["task_id"] == "task-2"
assert [item["task_id"] for item in snapshot["tasks"]] == ["task-2", "task-1"]
assert snapshot["tasks"][0]["is_current"] is True
assert snapshot["tasks"][1]["is_current"] is False
def test_clear_current_task_only_clears_when_task_id_matches():
now = datetime(2026, 4, 24, 9, 0, 0)
current_task = _FakeTask("task-current", created_at=now, status="running")
store = ReportTaskRuntimeStore[_FakeTask]()
store.set_current_task(current_task)
assert store.clear_current_task(task_id="task-other") is False
assert store.get_current_task() is current_task
assert store.clear_current_task(task_id="task-current") is True
assert store.get_current_task() is None