test_socket_events.py
1.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from __future__ import annotations
from apps.web_api.interfaces import socket_events
class _FakeSocketIO:
def __init__(self) -> None:
self.handlers: dict[str, object] = {}
def on(self, event: str):
def decorator(handler):
self.handlers[event] = handler
return handler
return decorator
def _build_process_status_getter(calls: list[str]):
def _get_process_status() -> dict[str, dict[str, object]]:
calls.append("get_process_status")
return {"query": {"status": "running", "port": 18503}}
return _get_process_status
def test_connect_handler_emits_connected_status(monkeypatch):
socketio = _FakeSocketIO()
emitted: list[tuple[str, object]] = []
monkeypatch.setattr(
socket_events,
"emit",
lambda event, payload: emitted.append((event, payload)),
)
socket_events.register_socketio_handlers(
socketio,
socket_events.SocketEventDependencies(
get_process_status=_build_process_status_getter([]),
),
)
socketio.handlers["connect"]()
assert emitted == [("status", "Connected to Flask server")]
def test_request_status_emits_runtime_status_snapshot(monkeypatch):
socketio = _FakeSocketIO()
call_order: list[str] = []
emitted: list[tuple[str, object]] = []
monkeypatch.setattr(
socket_events,
"emit",
lambda event, payload: emitted.append((event, payload)),
)
socket_events.register_socketio_handlers(
socketio,
socket_events.SocketEventDependencies(
get_process_status=_build_process_status_getter(call_order),
),
)
socketio.handlers["request_status"]()
assert call_order == ["get_process_status"]
assert emitted == [
("status_update", {"query": {"status": "running", "port": 18503}})
]