test_analysis_query_service.py 12.3 KB
from __future__ import annotations

from datetime import datetime, timedelta

from services.application.analysis import AnalysisRunQueryService, InMemoryAnalysisRunStore
from services.shared.dto import ResearchTaskDTO
from services.shared.models import AnalysisRunStatus, ErrorInfo


def test_get_analysis_run_reads_back_existing_run():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)
    run = run_store.create_run(
        research_task_id="task-query-service",
        query="museum query",
        engines=["query"],
    )

    loaded_run = service.get_analysis_run(run.id)

    assert loaded_run is not None
    assert loaded_run.id == run.id
    assert loaded_run.research_task_id == "task-query-service"


def test_get_analysis_run_returns_none_for_blank_id():
    service = AnalysisRunQueryService(analysis_run_store=InMemoryAnalysisRunStore())

    assert service.get_analysis_run("") is None
    assert service.get_analysis_run("   ") is None


def test_get_analysis_run_dto_returns_response_ready_payload():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)
    run = run_store.create_run(
        research_task_id="task-query-service",
        query="museum query",
        engines=["query"],
    )

    dto = service.get_analysis_run_dto(run.id)

    assert dto is not None
    assert dto.id == run.id
    assert dto.to_response_item()["run_id"] == run.id


def test_get_task_analysis_run_prefers_linked_run_for_same_task():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    older_run = run_store.create_run(
        research_task_id="task-query-service",
        query="older query",
        engines=["query"],
    )
    older_run.created_at = datetime(2026, 1, 1, 9, 0, 0)
    older_run.legacy_payload["label"] = "older"
    older_run = run_store.save_run(older_run)

    newer_run = run_store.create_run(
        research_task_id="task-query-service",
        query="newer query",
        engines=["insight"],
    )
    newer_run.created_at = datetime(2026, 1, 1, 10, 0, 0)
    newer_run = run_store.save_run(newer_run)

    loaded_run = service.get_task_analysis_run(
        research_task_id="task-query-service",
        analysis_run_id=older_run.id,
    )

    assert loaded_run is not None
    assert loaded_run.id == older_run.id
    assert loaded_run.legacy_payload["label"] == "older"
    assert loaded_run.id != newer_run.id


def test_get_task_analysis_run_falls_back_to_latest_run_when_link_is_missing():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)
    base_time = datetime(2026, 1, 1, 9, 0, 0)

    older_run = run_store.create_run(
        research_task_id="task-query-service",
        query="older query",
        engines=["query"],
    )
    older_run.created_at = base_time
    run_store.save_run(older_run)

    newer_run = run_store.create_run(
        research_task_id="task-query-service",
        query="newer query",
        engines=["insight"],
    )
    newer_run.created_at = base_time + timedelta(hours=1)
    newer_run.legacy_payload["label"] = "latest"
    newer_run = run_store.save_run(newer_run)

    foreign_run = run_store.create_run(
        research_task_id="other-task",
        query="foreign query",
        engines=["media"],
    )
    foreign_run = run_store.save_run(foreign_run)

    loaded_run = service.get_task_analysis_run(
        research_task_id="task-query-service",
        analysis_run_id=foreign_run.id,
    )

    assert loaded_run is not None
    assert loaded_run.id == newer_run.id
    assert loaded_run.legacy_payload["label"] == "latest"


def test_list_task_analysis_runs_returns_history_for_task():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)
    base_time = datetime(2026, 1, 1, 9, 0, 0)

    first_run = run_store.create_run(
        research_task_id="task-query-service",
        query="first query",
        engines=["query"],
    )
    first_run.created_at = base_time
    first_run = run_store.save_run(first_run)

    second_run = run_store.create_run(
        research_task_id="task-query-service",
        query="second query",
        engines=["insight"],
    )
    second_run.created_at = base_time + timedelta(hours=1)
    second_run = run_store.save_run(second_run)

    other_run = run_store.create_run(
        research_task_id="other-task",
        query="other query",
        engines=["media"],
    )
    run_store.save_run(other_run)

    runs = service.list_task_analysis_runs(research_task_id="task-query-service")

    assert [run.id for run in runs] == [second_run.id, first_run.id]


def test_list_task_analysis_run_dtos_returns_latest_first_response_items():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    first_run = run_store.create_run(
        research_task_id="task-query-service",
        query="first query",
        engines=["query"],
    )
    second_run = run_store.create_run(
        research_task_id="task-query-service",
        query="second query",
        engines=["insight"],
    )

    analysis_runs = service.list_task_analysis_run_dtos(research_task_id="task-query-service")

    assert [run.id for run in analysis_runs] == [second_run.id, first_run.id]
    assert [run.to_response_item()["id"] for run in analysis_runs] == [second_run.id, first_run.id]


def test_get_task_analysis_resource_dto_returns_current_run_and_history():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    older_run = run_store.create_run(
        research_task_id="task-query-service",
        query="older query",
        engines=["query"],
    )
    older_run = run_store.save_run(older_run)

    current_run = run_store.create_run(
        research_task_id="task-query-service",
        query="current query",
        engines=["insight"],
    )
    current_run = run_store.save_run(current_run)

    dto = service.get_task_analysis_resource_dto(
        research_task_id="task-query-service",
        analysis_run_id=current_run.id,
    )

    assert dto.task_id == "task-query-service"
    assert dto.analysis_run_id == current_run.id
    assert dto.current_run is not None
    assert dto.current_run.id == current_run.id
    assert [run.id for run in dto.runs] == [current_run.id, older_run.id]
    assert dto.summary["current_run_id"] == current_run.id
    assert dto.history["linked_run_is_current"] is True


def test_build_task_analysis_resource_payload_returns_current_run_and_history():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    first_run = run_store.create_run(
        research_task_id="task-query-service",
        query="first query",
        engines=["query"],
    )
    first_run = run_store.save_run(first_run)

    second_run = run_store.create_run(
        research_task_id="task-query-service",
        query="second query",
        engines=["insight"],
    )
    second_run.partial_results["insight"] = {
        "success": True,
        "status": "completed",
        "summary": "insight complete",
        "artifacts": {"themes": ["service"]},
        "metrics": {"duration_seconds": 1.8},
    }
    second_run = run_store.save_run(second_run)

    payload = service.build_task_analysis_resource_payload(
        research_task_id="task-query-service",
        analysis_run_id=second_run.id,
    )

    assert payload["task_id"] == "task-query-service"
    assert payload["analysis"]["analysis_run_id"] == second_run.id
    assert payload["analysis"]["current_run"]["id"] == second_run.id
    assert [item["id"] for item in payload["analysis"]["runs"]] == [second_run.id, first_run.id]
    assert payload["analysis"]["summary"]["current_run_id"] == second_run.id
    assert payload["analysis"]["history"]["linked_run_is_current"] is True
    assert payload["analysis"]["stats"]["runs_with_results"] == 1


def test_build_task_analysis_resource_payload_aggregates_stats_for_resolved_current_run():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    running_run = run_store.create_run(
        research_task_id="task-query-service",
        query="running query",
        engines=["query", "insight"],
    )
    running_run.status = AnalysisRunStatus.RUNNING
    running_run.partial_results["query"] = {
        "success": True,
        "status": "completed",
        "summary": "query complete",
        "artifacts": {"sources": ["reviews"]},
        "metrics": {"duration_seconds": 0.9},
    }
    running_run = run_store.save_run(running_run)

    failed_run = run_store.create_run(
        research_task_id="task-query-service",
        query="failed query",
        engines=["media"],
    )
    failed_run.created_at = datetime(2026, 1, 1, 8, 0, 0)
    failed_run.status = AnalysisRunStatus.FAILED
    failed_run.error = ErrorInfo(message="failed")
    failed_run = run_store.save_run(failed_run)

    payload = service.build_task_analysis_resource_payload(
        research_task_id="task-query-service",
        analysis_run_id="analysis-missing",
        task=ResearchTaskDTO(
            id="task-query-service",
            status="analyzing",
            last_action="Task fallback action",
            error={"message": "Task fallback error"},
            updated_at="2026-04-16T12:00:00",
        ),
    )

    assert payload["analysis"]["current_run"]["id"] == running_run.id
    assert payload["analysis"]["summary"]["linked_run_id"] == "analysis-missing"
    assert payload["analysis"]["history"]["linked_run_available"] is False
    assert payload["analysis"]["history"]["linked_run_is_current"] is False
    assert payload["analysis"]["stats"]["status_counts"]["running"] == 1
    assert payload["analysis"]["stats"]["status_counts"]["failed"] == 1
    assert payload["analysis"]["stats"]["active_run_count"] == 1
    assert payload["analysis"]["stats"]["terminal_run_count"] == 1
    assert payload["analysis"]["stats"]["unique_engines"] == ["insight", "media", "query"]
    assert payload["analysis"]["current_run"]["partial_results"]["query"]["engine_name"] == "query"


def test_build_task_analysis_resource_payload_normalizes_current_run_legacy_results_and_failure_stats():
    run_store = InMemoryAnalysisRunStore()
    service = AnalysisRunQueryService(analysis_run_store=run_store)

    current_run = run_store.create_run(
        research_task_id="task-query-service",
        query="mixed results query",
        engines=["query", "insight"],
    )
    current_run.status = AnalysisRunStatus.PARTIAL
    current_run.last_action = "One engine failed while insight completed"
    current_run.error = ErrorInfo(message="1 engine failed")
    current_run.partial_results["query"] = "timeout while waiting for upstream reviews"
    current_run.partial_results["insight"] = {
        "themes": ["service"],
        "duration_seconds": 1.5,
    }
    current_run = run_store.save_run(current_run)

    payload = service.build_task_analysis_resource_payload(
        research_task_id="task-query-service",
        analysis_run_id=current_run.id,
    )

    query_result = payload["analysis"]["current_run"]["partial_results"]["query"]
    insight_result = payload["analysis"]["current_run"]["partial_results"]["insight"]

    assert payload["analysis"]["status"] == "partial"
    assert payload["analysis"]["summary"]["error"]["message"] == "1 engine failed"
    assert payload["analysis"]["summary"]["current_result_count"] == 2
    assert payload["analysis"]["stats"]["current_success_result_count"] == 1
    assert payload["analysis"]["stats"]["current_failed_result_count"] == 1
    assert query_result["engine_name"] == "query"
    assert query_result["status"] == "failed"
    assert query_result["success"] is False
    assert query_result["summary"] == "timeout while waiting for upstream reviews"
    assert query_result["error"]["code"] == "engine_execution_failed"
    assert query_result["error"]["message"] == "timeout while waiting for upstream reviews"
    assert insight_result["engine_name"] == "insight"
    assert insight_result["status"] == "completed"
    assert insight_result["success"] is True
    assert insight_result["summary"] == "insight completed"
    assert insight_result["artifacts"]["themes"] == ["service"]
    assert insight_result["metrics"]["duration_seconds"] == 1.5