test_socket_events.py 1.84 KB
from __future__ import annotations

from apps.web_api.interfaces import socket_events


class _FakeSocketIO:
    def __init__(self) -> None:
        self.handlers: dict[str, object] = {}

    def on(self, event: str):
        def decorator(handler):
            self.handlers[event] = handler
            return handler

        return decorator


def _build_process_status_getter(calls: list[str]):
    def _get_process_status() -> dict[str, dict[str, object]]:
        calls.append("get_process_status")
        return {"query": {"status": "running", "port": 18503}}

    return _get_process_status


def test_connect_handler_emits_connected_status(monkeypatch):
    socketio = _FakeSocketIO()
    emitted: list[tuple[str, object]] = []

    monkeypatch.setattr(
        socket_events,
        "emit",
        lambda event, payload: emitted.append((event, payload)),
    )

    socket_events.register_socketio_handlers(
        socketio,
        socket_events.SocketEventDependencies(
            get_process_status=_build_process_status_getter([]),
        ),
    )

    socketio.handlers["connect"]()

    assert emitted == [("status", "Connected to Flask server")]


def test_request_status_emits_runtime_status_snapshot(monkeypatch):
    socketio = _FakeSocketIO()
    call_order: list[str] = []
    emitted: list[tuple[str, object]] = []

    monkeypatch.setattr(
        socket_events,
        "emit",
        lambda event, payload: emitted.append((event, payload)),
    )

    socket_events.register_socketio_handlers(
        socketio,
        socket_events.SocketEventDependencies(
            get_process_status=_build_process_status_getter(call_order),
        ),
    )

    socketio.handlers["request_status"]()

    assert call_order == ["get_process_status"]
    assert emitted == [
        ("status_update", {"query": {"status": "running", "port": 18503}})
    ]