test_system_lifecycle.py 10.8 KB
from __future__ import annotations

from pathlib import Path

from apps.web_api.runtime import system_lifecycle
from apps.web_api.runtime.process_registry import ProcessRuntimeRegistry
from apps.web_api.runtime.system_state import SystemStateRegistry


def _build_process_registry() -> ProcessRuntimeRegistry:
    return ProcessRuntimeRegistry(
        {
            "insight": {"process": None, "port": 18501, "status": "stopped", "output": []},
            "media": {"process": None, "port": 18502, "status": "stopped", "output": []},
            "query": {"process": None, "port": 18503, "status": "stopped", "output": []},
            "forum": {"process": None, "port": None, "status": "stopped", "output": []},
        }
    )


def _build_dependencies(captured: dict[str, object]) -> system_lifecycle.SystemLifecycleDependencies:
    def start_streamlit_app(app_name, script_path, port, *, log_dir, emit_output):
        captured.setdefault("start_streamlit_app", []).append((app_name, script_path, port, log_dir))
        emit_output("runtime", {"app": app_name, "port": port})
        return True, f"{app_name} started"

    def wait_for_app_startup(app_name, timeout_seconds):
        captured.setdefault("wait_for_app_startup", []).append((app_name, timeout_seconds))
        return True, f"{app_name} healthy"

    def start_forum_engine(*, emit_output):
        captured.setdefault("start_forum_engine", []).append("called")
        emit_output("forum", {"status": "started"})
        return True

    def stop_forum_engine(*, emit_output=None):
        captured.setdefault("stop_forum_engine", []).append(emit_output)

    def check_app_status() -> None:
        captured.setdefault("check_app_status", []).append("checked")

    def describe_running_children() -> list[str]:
        captured.setdefault("describe_running_children", []).append("called")
        return ["insight(321)", "media(654)"]

    def log_shutdown_step(message: str) -> None:
        captured.setdefault("log_shutdown_step", []).append(message)

    def cleanup_processes_concurrent(*, cleanup, timeout):
        captured.setdefault("cleanup_processes_concurrent", []).append(
            {
                "cleanup": cleanup,
                "timeout": timeout,
            }
        )

    return system_lifecycle.SystemLifecycleDependencies(
        streamlit_scripts={
            "insight": "insight_app.py",
            "media": "media_app.py",
            "query": "query_app.py",
        },
        start_streamlit_app=start_streamlit_app,
        wait_for_app_startup=wait_for_app_startup,
        check_app_status=check_app_status,
        start_forum_engine=start_forum_engine,
        stop_forum_engine=stop_forum_engine,
        cleanup_processes_concurrent=cleanup_processes_concurrent,
        describe_running_children=describe_running_children,
        log_shutdown_step=log_shutdown_step,
    )


def _build_service(
    *,
    process_registry: ProcessRuntimeRegistry | None = None,
    system_state_registry: SystemStateRegistry | None = None,
    dependencies: system_lifecycle.SystemLifecycleDependencies | None = None,
):
    emitted: list[tuple[str, dict]] = []
    stopped: list[str] = []
    service = system_lifecycle.SystemLifecycleService(
        log_dir=Path("."),
        emit_output=lambda event, payload: emitted.append((event, payload)),
        stop_socketio=lambda: stopped.append("called"),
        process_registry=process_registry or _build_process_registry(),
        system_state_registry=system_state_registry or SystemStateRegistry(),
        dependencies=dependencies or _build_dependencies({}),
    )
    return service, emitted, stopped


def test_start_system_happy_path_uses_injected_dependencies():
    captured: dict[str, object] = {}
    process_registry = _build_process_registry()
    state_registry = SystemStateRegistry()
    service, emitted, _ = _build_service(
        process_registry=process_registry,
        system_state_registry=state_registry,
        dependencies=_build_dependencies(captured),
    )

    payload, status_code = service.start_system()

    assert status_code == 200
    assert payload["success"] is True
    assert len(payload["logs"]) == 7
    assert captured["start_streamlit_app"] == [
        ("insight", "insight_app.py", 18501, Path(".")),
        ("media", "media_app.py", 18502, Path(".")),
        ("query", "query_app.py", 18503, Path(".")),
    ]
    assert captured["wait_for_app_startup"] == [("insight", 15), ("media", 15), ("query", 15)]
    assert captured["start_forum_engine"] == ["called"]
    assert captured["check_app_status"] == ["checked"]
    assert emitted == [
        ("runtime", {"app": "insight", "port": 18501}),
        ("runtime", {"app": "media", "port": 18502}),
        ("runtime", {"app": "query", "port": 18503}),
        ("forum", {"status": "started"}),
    ]
    assert state_registry.snapshot() == {
        "started": True,
        "starting": False,
        "shutdown_requested": False,
    }


def test_start_system_returns_400_when_registry_is_already_starting(monkeypatch):
    process_registry = _build_process_registry()
    state_registry = SystemStateRegistry()
    state_registry.set_state(starting=True)
    service, _, _ = _build_service(
        process_registry=process_registry,
        system_state_registry=state_registry,
    )

    initialize_calls: list[str] = []
    monkeypatch.setattr(
        service,
        "initialize_system_components",
        lambda: initialize_calls.append("called"),
    )

    payload, status_code = service.start_system()

    assert status_code == 400
    assert payload["success"] is False
    assert initialize_calls == []
    assert state_registry.snapshot() == {
        "started": False,
        "starting": True,
        "shutdown_requested": False,
    }


def test_shutdown_system_is_idempotent_after_shutdown_requested():
    captured: dict[str, object] = {}
    process_registry = _build_process_registry()
    state_registry = SystemStateRegistry()
    state_registry.set_state(started=True)
    service, _, _ = _build_service(
        process_registry=process_registry,
        system_state_registry=state_registry,
        dependencies=_build_dependencies(captured),
    )

    async_shutdown_calls: list[float] = []
    service.start_async_shutdown = lambda cleanup_timeout=3.0: async_shutdown_calls.append(cleanup_timeout)  # type: ignore[method-assign]

    first_payload, first_status = service.shutdown_system(cleanup_timeout=4.5)
    second_payload, second_status = service.shutdown_system(cleanup_timeout=4.5)

    assert first_status == 200
    assert first_payload["success"] is True
    assert second_status == 200
    assert second_payload["success"] is True
    assert first_payload["ports"] == ["insight:18501", "media:18502", "query:18503"]
    assert second_payload["ports"] == first_payload["ports"]
    assert async_shutdown_calls == [4.5]
    assert captured["describe_running_children"] == ["called", "called"]
    assert captured["log_shutdown_step"] == [
        "Starting system shutdown; waiting for processes: insight(321), media(654)",
    ]
    assert state_registry.snapshot() == {
        "started": False,
        "starting": False,
        "shutdown_requested": True,
    }


def test_start_async_shutdown_uses_injected_cleanup_dependencies(monkeypatch):
    captured: dict[str, object] = {}
    service, _, _ = _build_service(
        process_registry=_build_process_registry(),
        system_state_registry=SystemStateRegistry(),
        dependencies=_build_dependencies(captured),
    )

    scheduled_delays: list[float] = []
    monkeypatch.setattr(
        service,
        "schedule_server_shutdown",
        lambda delay_seconds=0.1: scheduled_delays.append(delay_seconds),
    )

    class _ImmediateThread:
        def __init__(self, *, target, daemon):
            self._target = target
            self.daemon = daemon

        def start(self) -> None:
            self._target()

    class _FakeTimer:
        def __init__(self, interval, function):
            self.interval = interval
            self.function = function
            self.daemon = False
            captured.setdefault("timers", []).append(self)

        def start(self) -> None:
            captured.setdefault("timer_starts", []).append(self.interval)

    monkeypatch.setattr(system_lifecycle.threading, "Thread", _ImmediateThread)
    monkeypatch.setattr(system_lifecycle.threading, "Timer", _FakeTimer)

    state_registry = service._system_state_registry
    service.start_async_shutdown(cleanup_timeout=5.5)

    cleanup_call = captured["cleanup_processes_concurrent"][0]
    assert cleanup_call["timeout"] == 5.5
    assert cleanup_call["cleanup"].stop_forum_engine is service._dependencies.stop_forum_engine
    cleanup_call["cleanup"].mark_system_stopped()
    assert state_registry.snapshot() == {
        "started": False,
        "starting": False,
        "shutdown_requested": False,
    }
    assert captured["timer_starts"] == [7.5]
    assert scheduled_delays == [0.05]
    assert captured["log_shutdown_step"] == [
        "Received shutdown request; starting async cleanup (timeout=5.5s)",
        "Cleanup thread finished; scheduling host process exit",
    ]


def test_system_lifecycle_service_resolves_default_dependencies_at_runtime(monkeypatch):
    captured: dict[str, object] = {}
    fake_dependencies = _build_dependencies({})

    def fake_builder(*, process_registry=None, **_kwargs):
        captured["process_registry"] = process_registry
        return fake_dependencies

    monkeypatch.setattr(
        system_lifecycle,
        "build_default_system_lifecycle_dependencies",
        fake_builder,
    )

    process_registry = _build_process_registry()
    service = system_lifecycle.SystemLifecycleService(
        log_dir=Path("."),
        emit_output=lambda *_args, **_kwargs: None,
        stop_socketio=lambda: None,
        process_registry=process_registry,
        system_state_registry=SystemStateRegistry(),
        dependencies=None,
    )

    assert service._dependencies is fake_dependencies
    assert captured["process_registry"] is process_registry


def test_build_process_cleanup_port_marks_injected_system_state():
    registry = SystemStateRegistry()
    registry.set_state(started=True, starting=True)
    stop_calls: list[str] = []

    cleanup = system_lifecycle.build_process_cleanup_port(
        stop_forum_engine=lambda: stop_calls.append("stopped"),
        system_state_registry=registry,
    )

    cleanup.stop_forum_engine()
    cleanup.mark_system_stopped()

    assert stop_calls == ["stopped"]
    assert registry.snapshot() == {
        "started": False,
        "starting": False,
        "shutdown_requested": False,
    }


def test_system_state_registry_can_clear_shutdown_request():
    registry = SystemStateRegistry()

    assert registry.request_shutdown() is True
    assert registry.snapshot()["shutdown_requested"] is True

    registry.clear_shutdown_request()

    assert registry.snapshot()["shutdown_requested"] is False