status_service.py 1.36 KB
"""Runtime status query helpers for the web API."""

from __future__ import annotations

from typing import Any, Callable

from apps.web_api.runtime.process_registry import ProcessRuntimeRegistry
from apps.web_api.runtime.system_state import SystemStateRegistry

StatusRefreshHandler = Callable[[], None]


class RuntimeStatusService:
    """Provide a single query facade for runtime process and system status."""

    def __init__(
        self,
        *,
        process_registry: ProcessRuntimeRegistry,
        system_state_registry: SystemStateRegistry,
        refresh_process_status: StatusRefreshHandler,
    ) -> None:
        self._process_registry = process_registry
        self._system_state_registry = system_state_registry
        self._refresh_process_status = refresh_process_status

    def get_process_status_snapshot(
        self,
        *,
        include_output_lines: bool = False,
    ) -> dict[str, dict[str, Any]]:
        self._refresh_process_status()
        return self._process_registry.status_snapshot(
            include_output_lines=include_output_lines
        )

    def get_system_status_payload(self) -> dict[str, Any]:
        state = self._system_state_registry.snapshot()
        return {
            "success": True,
            "started": state["started"],
            "starting": state["starting"],
        }


__all__ = ["RuntimeStatusService"]