test_stream_runtime.py
2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from queue import Empty
from services.engines.report.stream_runtime import (
STREAM_HEARTBEAT_INTERVAL,
STREAM_IDLE_TIMEOUT,
STREAM_TERMINAL_STATUSES,
TaskEventStream,
StreamSubscriberRegistry,
build_heartbeat_event,
format_sse,
)
def test_constants_match_report_stream_contract():
assert STREAM_HEARTBEAT_INTERVAL == 15
assert STREAM_IDLE_TIMEOUT == 120
assert STREAM_TERMINAL_STATUSES == {"completed", "error", "cancelled"}
def test_registry_register_broadcast_and_unregister():
registry = StreamSubscriberRegistry()
subscriber_a = registry.register("task-1")
subscriber_b = registry.register("task-1")
event = {"id": 1, "type": "progress", "payload": {"value": 50}}
delivered = registry.broadcast("task-1", event)
assert delivered == 2
assert subscriber_a.get_nowait() == event
assert subscriber_b.get_nowait() == event
registry.unregister("task-1", subscriber_a)
registry.unregister("task-1", subscriber_b)
assert registry.subscriber_count("task-1") == 0
def test_task_event_stream_publishes_history_and_calls_broadcaster():
captured = []
stream = TaskEventStream("task-2", timestamp_factory=lambda: "2026-04-23T00:00:00Z")
first = stream.publish("stage", {"step": "prepare"}, broadcaster=lambda task_id, event: captured.append((task_id, event)))
second = stream.publish("progress", {"progress": 25})
assert first["id"] == 1
assert second["id"] == 2
assert captured == [("task-2", first)]
assert stream.history_since(None) == [first, second]
assert stream.history_since(1) == [second]
assert stream.last_event_id == 2
def test_task_event_stream_respects_max_history():
stream = TaskEventStream("task-3", max_history=2, timestamp_factory=lambda: "2026-04-23T00:00:00Z")
stream.publish("stage", {"step": 1})
second = stream.publish("stage", {"step": 2})
third = stream.publish("stage", {"step": 3})
assert stream.history_since(None) == [second, third]
def test_format_sse_serializes_unicode_and_defaults():
rendered = format_sse({"id": 7, "type": "warning", "payload": {"message": "报告完成"}})
assert rendered.startswith("id: 7\nevent: warning\ndata: ")
assert '"message": "报告完成"' in rendered
assert rendered.endswith("\n\n")
def test_build_heartbeat_event_uses_task_status_payload():
heartbeat = build_heartbeat_event("task-4", status="running", timestamp="2026-04-23T12:34:56Z")
assert heartbeat["type"] == "heartbeat"
assert heartbeat["task_id"] == "task-4"
assert heartbeat["timestamp"] == "2026-04-23T12:34:56Z"
assert heartbeat["payload"] == {"status": "running"}
assert str(heartbeat["id"]).startswith("hb-")