test_crawler_query_service.py 5.63 KB
from __future__ import annotations

from services.application.crawler.query_service import CrawlerJobQueryService


def test_list_crawler_jobs_filters_to_crawl_history_and_preserves_order():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {"status": "idle"},
        storage_state_getter=lambda: {
            "history": [
                {
                    "id": "crawl-2",
                    "kind": "crawl",
                    "platform": "xiaohongshu",
                    "status": "completed",
                    "message": "latest crawl",
                    "config": {"keywords": "museum\ncoffee"},
                    "created_at": "2026-04-16T12:10:00",
                },
                {
                    "id": "login-1",
                    "kind": "login",
                    "platform": "xiaohongshu",
                    "status": "logged_in",
                    "message": "login ok",
                },
                {
                    "id": "crawl-1",
                    "kind": "crawl",
                    "platform": "dianping",
                    "status": "error",
                    "message": "crawl failed",
                    "config": {"keywords": ["tea", "dessert"]},
                    "created_at": "2026-04-16T11:10:00",
                },
            ]
        },
    )

    payload = service.build_crawler_jobs_payload()

    assert payload["current_job"]["id"] == "crawl-2"
    assert [item["id"] for item in payload["jobs"]] == ["crawl-2", "crawl-1"]
    assert payload["jobs"][0]["keywords"] == ["museum", "coffee"]
    assert payload["jobs"][1]["error"]["message"] == "crawl failed"


def test_current_crawler_job_prefers_active_history_entry():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {
            "status": "running",
            "platform": "xiaohongshu",
            "platform_label": "Xiaohongshu",
            "crawler_type": "search",
            "started_at": "2026-04-16T12:20:00",
            "message": "running crawl",
            "history_id": "crawl-active",
            "current_config": {"keywords": "museum"},
        },
        storage_state_getter=lambda: {
            "history": [
                {
                    "id": "crawl-active",
                    "kind": "crawl",
                    "platform": "xiaohongshu",
                    "platform_label": "Xiaohongshu",
                    "status": "running",
                    "message": "active from history",
                    "config": {"keywords": "museum"},
                    "created_at": "2026-04-16T12:20:00",
                }
            ]
        },
    )

    job = service.get_current_crawler_job()

    assert job is not None
    assert job.id == "crawl-active"
    assert job.last_action == "active from history"
    assert job.keywords == ["museum"]


def test_current_crawler_job_returns_none_when_history_is_empty_and_state_is_idle():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {"status": "idle"},
        storage_state_getter=lambda: {"history": []},
    )

    assert service.get_current_crawler_job() is None


def test_get_crawler_job_returns_history_entry_when_present():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {"status": "idle"},
        storage_state_getter=lambda: {
            "history": [
                {
                    "id": "crawl-history",
                    "kind": "crawl",
                    "platform": "dianping",
                    "platform_label": "Dianping",
                    "status": "completed",
                    "message": "history crawl",
                    "config": {"keywords": ["tea", "dessert"]},
                    "created_at": "2026-04-16T10:00:00",
                }
            ]
        },
    )

    job = service.get_crawler_job("crawl-history")

    assert job is not None
    assert job.id == "crawl-history"
    assert job.platform == "dianping"
    assert job.status == "completed"
    assert job.keywords == ["tea", "dessert"]


def test_get_crawler_job_returns_current_live_job_when_it_matches_requested_id():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {
            "status": "running",
            "platform": "weibo",
            "platform_label": "Weibo",
            "crawler_type": "search",
            "started_at": "2026-04-16T12:30:00",
            "message": "live crawl only",
            "history_id": "crawl-live",
            "current_config": {"keywords": "concert"},
        },
        storage_state_getter=lambda: {"history": []},
    )

    job = service.get_crawler_job("crawl-live")

    assert job is not None
    assert job.id == "crawl-live"
    assert job.platform == "weibo"
    assert job.status == "running"
    assert job.keywords == ["concert"]


def test_get_crawler_job_falls_back_to_live_state_when_history_entry_is_missing():
    service = CrawlerJobQueryService(
        crawl_state_getter=lambda: {
            "status": "running",
            "platform": "weibo",
            "platform_label": "Weibo",
            "crawler_type": "search",
            "started_at": "2026-04-16T12:30:00",
            "message": "live crawl only",
            "history_id": "crawl-live",
            "current_config": {"keywords": "concert"},
        },
        storage_state_getter=lambda: {"history": []},
    )

    payload = service.build_crawler_job_payload("crawl-live")

    assert payload["crawler_job"] is not None
    assert payload["crawler_job"]["id"] == "crawl-live"
    assert payload["crawler_job"]["platform"] == "weibo"
    assert payload["crawler_job"]["status"] == "running"
    assert payload["crawler_job"]["keywords"] == ["concert"]