test_task_runtime.py 3.9 KB
from __future__ import annotations

from datetime import datetime, timedelta

from services.engines.report.task_runtime import ReportTaskRuntimeStore
from services.shared.dto import ReportJobDTO


class _FakeTask:
    def __init__(
        self,
        task_id: str,
        *,
        created_at: datetime,
        updated_at: datetime | None = None,
        status: str = "completed",
    ) -> None:
        self.task_id = task_id
        self.created_at = created_at
        self.updated_at = updated_at or created_at
        self.status = status

    def to_dto(self) -> ReportJobDTO:
        return ReportJobDTO(
            id=self.task_id,
            research_task_id=f"research-{self.task_id}",
            status=self.status,
            output_path=f"reports/{self.task_id}.html",
            output_format="html",
            artifacts={
                "report_file_ready": self.status == "completed",
                "report_file_path": f"reports/{self.task_id}.html",
            },
            last_action=f"task {self.task_id} {self.status}",
            created_at=self.created_at.isoformat(),
            updated_at=self.updated_at.isoformat(),
        )


def test_get_task_prefers_current_task_over_registry_and_loader():
    now = datetime(2026, 4, 23, 10, 0, 0)
    current_task = _FakeTask("task-current", created_at=now, status="running")
    loader_calls: list[str] = []

    store = ReportTaskRuntimeStore[_FakeTask](
        persisted_loader=lambda task_id: loader_calls.append(task_id) or None,
    )
    store.set_current_task(current_task)
    store.register_task(_FakeTask("task-current", created_at=now - timedelta(minutes=5)), set_as_current=False)

    resolved = store.get_task("task-current")

    assert resolved is current_task
    assert loader_calls == []


def test_get_task_loads_persisted_task_and_prunes_old_history():
    base_time = datetime(2026, 4, 23, 9, 0, 0)
    deleted_task_ids: list[str] = []
    loaded_task = _FakeTask("task-3", created_at=base_time + timedelta(minutes=20))

    store = ReportTaskRuntimeStore[_FakeTask](
        max_history=2,
        persisted_loader=lambda task_id: loaded_task if task_id == "task-3" else None,
        persisted_deleter=deleted_task_ids.append,
    )
    store.replace_registry(
        [
            _FakeTask("task-1", created_at=base_time),
            _FakeTask("task-2", created_at=base_time + timedelta(minutes=10)),
        ]
    )

    resolved = store.get_task("task-3")

    assert resolved is loaded_task
    assert [task.task_id for task in store.list_tasks()] == ["task-2", "task-3"]
    assert deleted_task_ids == ["task-1"]


def test_snapshot_tasks_returns_current_and_orders_history_by_updated_at():
    base_time = datetime(2026, 4, 23, 8, 0, 0)
    task_1 = _FakeTask(
        "task-1",
        created_at=base_time,
        updated_at=base_time + timedelta(minutes=5),
    )
    task_2 = _FakeTask(
        "task-2",
        created_at=base_time + timedelta(minutes=1),
        updated_at=base_time + timedelta(minutes=15),
        status="running",
    )

    store = ReportTaskRuntimeStore[_FakeTask]()
    store.replace_registry([task_1, task_2], current_task=task_2)

    snapshot = store.snapshot_tasks(limit=5)

    assert snapshot["current_task"]["task_id"] == "task-2"
    assert [item["task_id"] for item in snapshot["tasks"]] == ["task-2", "task-1"]
    assert snapshot["tasks"][0]["is_current"] is True
    assert snapshot["tasks"][1]["is_current"] is False


def test_clear_current_task_only_clears_when_task_id_matches():
    now = datetime(2026, 4, 24, 9, 0, 0)
    current_task = _FakeTask("task-current", created_at=now, status="running")

    store = ReportTaskRuntimeStore[_FakeTask]()
    store.set_current_task(current_task)

    assert store.clear_current_task(task_id="task-other") is False
    assert store.get_current_task() is current_task

    assert store.clear_current_task(task_id="task-current") is True
    assert store.get_current_task() is None