test_process_manager_cleanup.py 5.42 KB
from __future__ import annotations

import subprocess

from apps.web_api.runtime import process_manager
from apps.web_api.runtime.engine_registry import EngineRuntimeRegistry, build_default_engine_catalog
from apps.web_api.runtime.process_manager import ProcessManager
from apps.web_api.runtime.process_registry import ProcessRuntimeRegistry


def _build_process_registry() -> ProcessRuntimeRegistry:
    return ProcessRuntimeRegistry(
        {
            "insight": {"process": None, "port": 18501, "status": "running", "output": []},
            "media": {"process": None, "port": 18502, "status": "running", "output": []},
            "query": {"process": None, "port": 18503, "status": "running", "output": []},
            "forum": {"process": None, "port": None, "status": "running", "output": []},
        }
    )


def test_process_manager_describe_running_children_uses_injected_registry():
    registry = _build_process_registry()

    class _RunningProcess:
        pid = 4321

        @staticmethod
        def poll():
            return None

    registry.set_process("query", _RunningProcess())

    manager = ProcessManager(process_registry=registry)

    assert manager.describe_running_children() == ["query(pid=4321, port=18503)"]


def test_cleanup_processes_marks_system_stopped_even_when_forum_stop_fails(monkeypatch):
    registry = _build_process_registry()
    manager = ProcessManager(process_registry=registry)

    stop_calls: list[str] = []
    mark_calls: list[str] = []

    monkeypatch.setattr(manager, "stop_streamlit_app", lambda app_name: stop_calls.append(app_name) or (True, f"{app_name} stopped"))

    cleanup = process_manager.ProcessCleanupPort(
        stop_forum_engine=lambda: (_ for _ in ()).throw(RuntimeError("forum stop failed")),
        mark_system_stopped=lambda: mark_calls.append("marked"),
    )

    manager.cleanup_processes(cleanup=cleanup)

    assert stop_calls == ["insight", "media", "query"]
    assert registry.get_status("forum") == "stopped"
    assert mark_calls == ["marked"]


def test_cleanup_processes_concurrent_resets_lingering_processes_and_marks_system_stopped(monkeypatch):
    registry = _build_process_registry()
    manager = ProcessManager(process_registry=registry)

    stop_calls: list[str] = []
    forum_stop_calls: list[str] = []
    mark_calls: list[str] = []

    class _ImmediateThread:
        def __init__(self, *, target, args=(), daemon):
            self._target = target
            self._args = args
            self.daemon = daemon

        def start(self) -> None:
            try:
                self._target(*self._args)
            except Exception:
                pass

        def join(self, timeout=None) -> None:
            return None

    class _LingeringProcess:
        def __init__(self) -> None:
            self.pid = 4242
            self._alive = True
            self.terminate_calls = 0
            self.kill_calls = 0
            self.wait_calls: list[float | None] = []

        def poll(self):
            return None if self._alive else 0

        def terminate(self) -> None:
            self.terminate_calls += 1

        def wait(self, timeout=None) -> int:
            self.wait_calls.append(timeout)
            if self.kill_calls == 0:
                raise subprocess.TimeoutExpired(cmd="streamlit", timeout=timeout)
            self._alive = False
            return 0

        def kill(self) -> None:
            self.kill_calls += 1
            self._alive = False

    lingering_process = _LingeringProcess()
    registry.set_process("insight", lingering_process)

    monkeypatch.setattr(process_manager.threading, "Thread", _ImmediateThread)
    monkeypatch.setattr(manager, "stop_streamlit_app", lambda app_name: stop_calls.append(app_name) or (True, f"{app_name} stopping"))

    cleanup = process_manager.ProcessCleanupPort(
        stop_forum_engine=lambda: forum_stop_calls.append("forum"),
        mark_system_stopped=lambda: mark_calls.append("marked"),
    )

    manager.cleanup_processes_concurrent(cleanup=cleanup, timeout=0.5)

    assert stop_calls == ["insight", "media", "query"]
    assert forum_stop_calls == ["forum"]
    assert lingering_process.terminate_calls == 1
    assert lingering_process.kill_calls == 1
    assert lingering_process.wait_calls == [1, 1]
    assert registry.get_process("insight") is None
    assert registry.get_status("insight") == "stopped"
    assert registry.get_status("forum") == "stopped"
    assert mark_calls == ["marked"]


def test_process_manager_without_injected_registry_uses_runtime_module_registry(monkeypatch):
    registry = _build_process_registry()
    manager = ProcessManager()

    class _RunningProcess:
        pid = 2468

        @staticmethod
        def poll():
            return None

    registry.set_process("media", _RunningProcess())
    monkeypatch.setattr(process_manager, "PROCESS_RUNTIME_REGISTRY", registry)

    assert manager.process_registry is registry
    assert manager.describe_running_children() == ["media(pid=2468, port=18502)"]


def test_streamlit_scripts_compat_view_tracks_current_engine_registry(monkeypatch):
    engine_registry = EngineRuntimeRegistry(build_default_engine_catalog())
    engine_registry.catalog["query"]["script_path"] = "custom/query_app.py"

    monkeypatch.setattr(process_manager, "ENGINE_RUNTIME_REGISTRY", engine_registry)

    assert process_manager.STREAMLIT_SCRIPTS["query"] == "custom/query_app.py"
    assert dict(process_manager.STREAMLIT_SCRIPTS)["query"] == "custom/query_app.py"