test_task_runtime_store.py 3.28 KB
from __future__ import annotations

from apps.web_api.runtime.task_runtime_store import TaskRuntimeStore
from services.shared.models.common import ErrorInfo, ProgressInfo


def test_upsert_returns_clone_safe_snapshot():
    store = TaskRuntimeStore()
    snapshot = store.upsert(
        "task-1",
        status="analyzing",
        generated_query="museum research",
        analysis_run_id="analysis-1",
        last_action="accepted",
        progress=ProgressInfo(stage="queued", current=0, total=2, percent=0.0, message="accepted"),
        engines=["query", "insight"],
        partial_results={"query": {"success": True}},
        metrics={"engine_count": 2},
    )

    snapshot.engines.append("mutated")
    snapshot.partial_results["media"] = {"success": False}

    stored = store.get_task("task-1")
    assert stored is not None
    assert stored.research_task_id == "task-1"
    assert stored.status == "analyzing"
    assert stored.generated_query == "museum research"
    assert stored.analysis_run_id == "analysis-1"
    assert stored.engines == ["query", "insight"]
    assert stored.partial_results["query"]["engine_name"] == "query"
    assert stored.partial_results["query"]["success"] is True
    assert stored.partial_results["query"]["status"] == "completed"
    assert stored.metrics == {"engine_count": 2}


def test_upsert_can_clear_one_linked_id_without_dropping_others():
    store = TaskRuntimeStore()
    store.upsert(
        "task-2",
        crawler_job_id="crawl-2",
        analysis_run_id="analysis-2",
        report_job_id="report-2",
    )

    updated = store.upsert(
        "task-2",
        analysis_run_id=None,
        error=ErrorInfo(message="analysis failed", retryable=True),
    )

    assert updated.crawler_job_id == "crawl-2"
    assert updated.analysis_run_id is None
    assert updated.report_job_id == "report-2"
    assert updated.error is not None
    assert updated.error.message == "analysis failed"


def test_clear_task_and_snapshot_all_follow_store_state():
    store = TaskRuntimeStore()
    store.upsert("task-a", status="ready")
    store.upsert("task-b", status="reporting")

    snapshot_all = store.snapshot_all()
    snapshot_all["task-a"].status = "mutated"

    assert sorted(store.snapshot_all()) == ["task-a", "task-b"]
    assert store.get_task("task-a").status == "ready"
    assert store.clear_task("task-a") is True
    assert store.clear_task("task-a") is False
    assert list(store.snapshot_all()) == ["task-b"]


def test_upsert_normalizes_legacy_partial_results_to_engine_result_payloads():
    store = TaskRuntimeStore()

    snapshot = store.upsert(
        "task-legacy",
        partial_results={
            "query": "timeout",
            "insight": {
                "success": True,
                "summary": "insight complete",
            },
        },
    )

    assert snapshot.partial_results["query"]["engine_name"] == "query"
    assert snapshot.partial_results["query"]["success"] is False
    assert snapshot.partial_results["query"]["status"] == "failed"
    assert snapshot.partial_results["query"]["error"]["message"] == "timeout"
    assert snapshot.partial_results["insight"]["engine_name"] == "insight"
    assert snapshot.partial_results["insight"]["success"] is True
    assert snapshot.partial_results["insight"]["summary"] == "insight complete"