test_analysis_run_store.py 4.38 KB
from __future__ import annotations

from datetime import datetime, timedelta
from pathlib import Path
from uuid import uuid4

from services.application.analysis.run_store import InMemoryAnalysisRunStore, JsonAnalysisRunStore
from services.shared.models import AnalysisRunStatus


def _make_store_path() -> Path:
    workspace_temp_root = Path.cwd() / "var" / "test-artifacts"
    workspace_temp_root.mkdir(parents=True, exist_ok=True)
    return workspace_temp_root / f"analysis-run-store-{uuid4().hex}.json"


def test_json_analysis_run_store_round_trips_runs_across_instances():
    store_path = _make_store_path()
    try:
        first_store = JsonAnalysisRunStore(store_path)
        base_time = datetime(2026, 1, 1, 9, 0, 0)

        older_run = first_store.create_run(
            research_task_id="task-1",
            query="museum research",
            engines=["insight", "query"],
        )
        older_run.created_at = base_time
        older_run.status = AnalysisRunStatus.PARTIAL
        older_run.partial_results["insight"] = {"success": True, "message": "done"}
        older_run.metrics["success_count"] = 1
        older_run.metrics["failure_count"] = 1
        older_run.legacy_payload["source"] = "test"
        first_store.save_run(older_run)

        newer_run = first_store.create_run(
            research_task_id="task-1",
            query="city research",
            engines=["query"],
        )
        newer_run.created_at = base_time + timedelta(hours=2)
        newer_run.status = AnalysisRunStatus.COMPLETED
        newer_run.legacy_payload["source"] = "history"
        first_store.save_run(newer_run)

        unrelated_run = first_store.create_run(
            research_task_id="task-2",
            query="other task",
            engines=["query"],
        )
        unrelated_run.created_at = base_time + timedelta(hours=1)
        first_store.save_run(unrelated_run)

        second_store = JsonAnalysisRunStore(store_path)
        reloaded = second_store.get_run(older_run.id)
        listed_runs = second_store.list_runs_for_task("task-1")

        assert reloaded is not None
        assert reloaded.id == older_run.id
        assert reloaded.research_task_id == "task-1"
        assert reloaded.status == AnalysisRunStatus.PARTIAL
        assert reloaded.partial_results["insight"]["success"] is True
        assert reloaded.metrics["success_count"] == 1
        assert reloaded.metrics["failure_count"] == 1
        assert reloaded.legacy_payload["source"] == "test"
        assert [run.id for run in listed_runs] == [newer_run.id, older_run.id]
        assert [run.legacy_payload["source"] for run in listed_runs] == ["history", "test"]
    finally:
        store_path.unlink(missing_ok=True)


def test_json_analysis_run_store_returns_none_for_missing_or_invalid_payload():
    store_path = _make_store_path()
    try:
        store_path.write_text('{"runs": "broken"}', encoding="utf-8")

        store = JsonAnalysisRunStore(store_path)

        assert store.get_run("missing") is None
        assert store.list_runs_for_task("missing-task") == []
    finally:
        store_path.unlink(missing_ok=True)


def test_in_memory_analysis_run_store_lists_runs_for_task_in_descending_created_order():
    store = InMemoryAnalysisRunStore()
    base_time = datetime(2026, 1, 1, 9, 0, 0)

    older_run = store.create_run(
        research_task_id="task-1",
        query="museum research",
        engines=["insight"],
    )
    older_run.created_at = base_time
    older_run.legacy_payload["source"] = "older"
    store.save_run(older_run)

    newer_run = store.create_run(
        research_task_id="task-1",
        query="city research",
        engines=["query"],
    )
    newer_run.created_at = base_time + timedelta(hours=1)
    newer_run.legacy_payload["source"] = "newer"
    store.save_run(newer_run)

    other_task_run = store.create_run(
        research_task_id="task-2",
        query="other task",
        engines=["query"],
    )
    other_task_run.created_at = base_time + timedelta(hours=2)
    store.save_run(other_task_run)

    listed_runs = store.list_runs_for_task("task-1")

    assert [run.id for run in listed_runs] == [newer_run.id, older_run.id]
    assert [run.legacy_payload["source"] for run in listed_runs] == ["newer", "older"]


def test_in_memory_analysis_run_store_returns_empty_list_for_missing_task():
    store = InMemoryAnalysisRunStore()

    assert store.list_runs_for_task("missing-task") == []