test_stream_runtime.py 2.67 KB
from queue import Empty

from services.engines.report.stream_runtime import (
    STREAM_HEARTBEAT_INTERVAL,
    STREAM_IDLE_TIMEOUT,
    STREAM_TERMINAL_STATUSES,
    TaskEventStream,
    StreamSubscriberRegistry,
    build_heartbeat_event,
    format_sse,
)


def test_constants_match_report_stream_contract():
    assert STREAM_HEARTBEAT_INTERVAL == 15
    assert STREAM_IDLE_TIMEOUT == 120
    assert STREAM_TERMINAL_STATUSES == {"completed", "error", "cancelled"}


def test_registry_register_broadcast_and_unregister():
    registry = StreamSubscriberRegistry()
    subscriber_a = registry.register("task-1")
    subscriber_b = registry.register("task-1")
    event = {"id": 1, "type": "progress", "payload": {"value": 50}}

    delivered = registry.broadcast("task-1", event)

    assert delivered == 2
    assert subscriber_a.get_nowait() == event
    assert subscriber_b.get_nowait() == event

    registry.unregister("task-1", subscriber_a)
    registry.unregister("task-1", subscriber_b)

    assert registry.subscriber_count("task-1") == 0


def test_task_event_stream_publishes_history_and_calls_broadcaster():
    captured = []
    stream = TaskEventStream("task-2", timestamp_factory=lambda: "2026-04-23T00:00:00Z")

    first = stream.publish("stage", {"step": "prepare"}, broadcaster=lambda task_id, event: captured.append((task_id, event)))
    second = stream.publish("progress", {"progress": 25})

    assert first["id"] == 1
    assert second["id"] == 2
    assert captured == [("task-2", first)]
    assert stream.history_since(None) == [first, second]
    assert stream.history_since(1) == [second]
    assert stream.last_event_id == 2


def test_task_event_stream_respects_max_history():
    stream = TaskEventStream("task-3", max_history=2, timestamp_factory=lambda: "2026-04-23T00:00:00Z")

    stream.publish("stage", {"step": 1})
    second = stream.publish("stage", {"step": 2})
    third = stream.publish("stage", {"step": 3})

    assert stream.history_since(None) == [second, third]


def test_format_sse_serializes_unicode_and_defaults():
    rendered = format_sse({"id": 7, "type": "warning", "payload": {"message": "报告完成"}})

    assert rendered.startswith("id: 7\nevent: warning\ndata: ")
    assert '"message": "报告完成"' in rendered
    assert rendered.endswith("\n\n")


def test_build_heartbeat_event_uses_task_status_payload():
    heartbeat = build_heartbeat_event("task-4", status="running", timestamp="2026-04-23T12:34:56Z")

    assert heartbeat["type"] == "heartbeat"
    assert heartbeat["task_id"] == "task-4"
    assert heartbeat["timestamp"] == "2026-04-23T12:34:56Z"
    assert heartbeat["payload"] == {"status": "running"}
    assert str(heartbeat["id"]).startswith("hb-")