process_registry.py 4.89 KB
"""Runtime registry for managed process state."""

from __future__ import annotations

from typing import Any, Iterable

ProcessEntry = dict[str, Any]
ProcessTable = dict[str, ProcessEntry]


def build_default_process_table() -> ProcessTable:
    """Build the default runtime process state table."""
    return {
        "insight": {
            "process": None,
            "port": 8501,
            "status": "stopped",
            "output": [],
            "log_file": None,
            "healthcheck_started_at": None,
        },
        "media": {
            "process": None,
            "port": 8502,
            "status": "stopped",
            "output": [],
            "log_file": None,
            "healthcheck_started_at": None,
        },
        "query": {
            "process": None,
            "port": 8503,
            "status": "stopped",
            "output": [],
            "log_file": None,
            "healthcheck_started_at": None,
        },
        "forum": {
            "process": None,
            "port": None,
            "status": "stopped",
            "output": [],
            "log_file": None,
        },
    }


def _clone_entry(entry: ProcessEntry) -> ProcessEntry:
    cloned: ProcessEntry = {}
    for key, value in entry.items():
        if isinstance(value, list):
            cloned[key] = list(value)
        elif isinstance(value, dict):
            cloned[key] = dict(value)
        else:
            cloned[key] = value
    return cloned


class ProcessRuntimeRegistry:
    """Explicit wrapper around the managed process runtime table."""

    def __init__(self, table: ProcessTable | None = None) -> None:
        self._table = table if table is not None else build_default_process_table()

    @property
    def table(self) -> ProcessTable:
        return self._table

    def items(self):
        return self._table.items()

    def contains(self, app_name: str) -> bool:
        return app_name in self._table

    def get_entry(self, app_name: str) -> ProcessEntry:
        return self._table[app_name]

    def snapshot(self, app_name: str) -> ProcessEntry:
        return _clone_entry(self.get_entry(app_name))

    def snapshot_all(self) -> ProcessTable:
        return {name: self.snapshot(name) for name in self._table}

    def status_snapshot(self, *, include_output_lines: bool = False) -> dict[str, dict[str, Any]]:
        snapshot: dict[str, dict[str, Any]] = {}
        for app_name, info in self._table.items():
            entry = {
                "status": info.get("status"),
                "port": info.get("port"),
            }
            if include_output_lines:
                entry["output_lines"] = len(info.get("output", []))
            snapshot[app_name] = entry
        return snapshot

    def get_process(self, app_name: str):
        return self.get_entry(app_name).get("process")

    def set_process(self, app_name: str, process: Any) -> None:
        self.get_entry(app_name)["process"] = process

    def clear_process(self, app_name: str) -> None:
        self.get_entry(app_name)["process"] = None

    def get_port(self, app_name: str) -> int | None:
        port = self.get_entry(app_name).get("port")
        return int(port) if port is not None else None

    def get_status(self, app_name: str) -> str:
        return str(self.get_entry(app_name).get("status", ""))

    def set_status(self, app_name: str, status: str) -> None:
        self.get_entry(app_name)["status"] = status

    def set_output(self, app_name: str, output: list[str]) -> None:
        self.get_entry(app_name)["output"] = list(output)

    def append_output(self, app_name: str, line: str) -> None:
        self.get_entry(app_name).setdefault("output", []).append(line)

    def set_healthcheck_started_at(self, app_name: str, started_at: float | None) -> None:
        self.get_entry(app_name)["healthcheck_started_at"] = started_at

    def get_healthcheck_started_at(self, app_name: str) -> float | None:
        value = self.get_entry(app_name).get("healthcheck_started_at")
        return float(value) if value is not None else None

    def reset_runtime(
        self,
        app_name: str,
        *,
        status: str = "stopped",
        clear_output: bool = False,
    ) -> None:
        entry = self.get_entry(app_name)
        entry["process"] = None
        entry["status"] = status
        if "healthcheck_started_at" in entry:
            entry["healthcheck_started_at"] = None
        if clear_output:
            entry["output"] = []

    def running_apps(self, app_names: Iterable[str]) -> list[str]:
        return [name for name in app_names if self.contains(name) and self.get_status(name) == "running"]

    def active_ports(self) -> list[str]:
        ports: list[str] = []
        for app_name, info in self._table.items():
            port = info.get("port")
            if port:
                ports.append(f"{app_name}:{port}")
        return ports


PROCESS_RUNTIME_REGISTRY = ProcessRuntimeRegistry()