test_task_crawler_resource_dto.py
2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from __future__ import annotations
from services.shared.dto import CrawlerJobDTO, ResearchTaskDTO, TaskCrawlerResourceDTO
def test_task_crawler_resource_dto_builds_summary_and_history():
current_job = CrawlerJobDTO(
id="crawl-1",
research_task_id="task-1",
platform="xiaohongshu",
keywords=["museum", "coffee"],
status="running",
last_action="crawler started",
updated_at="2026-04-17T12:00:00",
)
payload = TaskCrawlerResourceDTO.from_parts(
task_id="task-1",
crawler_job_id="crawl-1",
current_job=current_job,
crawler_jobs=[current_job],
).to_response_payload()
assert payload["task_id"] == "task-1"
assert payload["crawler"]["crawler_job_id"] == "crawl-1"
assert payload["crawler"]["status"] == "running"
assert payload["crawler"]["current_job"]["id"] == "crawl-1"
assert payload["crawler"]["summary"]["job_count"] == 1
assert payload["crawler"]["summary"]["current_keyword_count"] == 2
assert payload["crawler"]["history"]["linked_job_available"] is True
assert payload["crawler"]["history"]["linked_job_is_current"] is True
def test_task_crawler_resource_dto_uses_task_fallbacks_when_no_jobs_exist():
task = ResearchTaskDTO(
id="task-2",
status="draft",
last_action="waiting for crawler",
updated_at="2026-04-17T12:30:00",
)
payload = TaskCrawlerResourceDTO.from_parts(
task_id="task-2",
crawler_job_id="crawl-missing",
task=task,
current_job=None,
crawler_jobs=[],
).to_response_payload()
assert payload["crawler"]["status"] == "idle"
assert payload["crawler"]["task_status"] == "draft"
assert payload["crawler"]["current_job"] is None
assert payload["crawler"]["jobs"] == []
assert payload["crawler"]["summary"]["last_action"] == "waiting for crawler"
assert payload["crawler"]["history"]["has_jobs"] is False
assert payload["crawler"]["history"]["linked_job_available"] is False