system_lifecycle.py 12.1 KB
"""System startup and shutdown orchestration for the web API runtime."""

from __future__ import annotations

import os
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Mapping

from loguru import logger

from apps.web_api.runtime.process_manager import (
    ProcessCleanupPort,
    log_shutdown_step,
)
from apps.web_api.runtime.engine_registry import ENGINE_RUNTIME_REGISTRY, EngineRuntimeRegistry
from apps.web_api.runtime.forum_runtime import ForumRuntime, build_forum_runtime
from apps.web_api.runtime.process_manager import ProcessManager, build_process_manager
from apps.web_api.runtime.process_registry import PROCESS_RUNTIME_REGISTRY, ProcessRuntimeRegistry
from apps.web_api.runtime.system_state import SYSTEM_STATE_REGISTRY, SystemStateRegistry

SocketStopper = Callable[[], None]
EmitOutput = Callable[[str, dict], None]
RunningChildrenDescriber = Callable[[], list[str]]
ShutdownLogger = Callable[[str], None]
StatusChecker = Callable[[], None]
CleanupConcurrentHandler = Callable[..., None]
StreamlitAppStarter = Callable[..., tuple[bool, str]]
StreamlitStartupWaiter = Callable[[str, int], tuple[bool, str]]
ForumEngineStarter = Callable[..., bool]
ForumEngineStopper = Callable[..., None]


@dataclass(frozen=True)
class SystemLifecycleDependencies:
    """Runtime callables consumed by ``SystemLifecycleService``."""

    streamlit_scripts: Mapping[str, str]
    start_streamlit_app: StreamlitAppStarter
    wait_for_app_startup: StreamlitStartupWaiter
    check_app_status: StatusChecker
    start_forum_engine: ForumEngineStarter
    stop_forum_engine: ForumEngineStopper
    cleanup_processes_concurrent: CleanupConcurrentHandler
    describe_running_children: RunningChildrenDescriber
    log_shutdown_step: ShutdownLogger


def build_process_cleanup_port(
    *,
    stop_forum_engine: Callable[[], Any],
    system_state_registry: SystemStateRegistry,
) -> ProcessCleanupPort:
    """Build the shared cleanup contract used by bootstrap and shutdown flows."""

    return ProcessCleanupPort(
        stop_forum_engine=stop_forum_engine,
        mark_system_stopped=lambda: system_state_registry.set_state(started=False, starting=False),
    )


def build_default_system_lifecycle_dependencies(
    *,
    process_registry: ProcessRuntimeRegistry | None = None,
    engine_registry: EngineRuntimeRegistry | None = None,
    process_manager: ProcessManager | None = None,
    forum_runtime: ForumRuntime | None = None,
) -> SystemLifecycleDependencies:
    """Resolve default lifecycle dependencies at runtime instead of import time."""

    runtime_process_registry = (
        process_registry if process_registry is not None else PROCESS_RUNTIME_REGISTRY
    )
    runtime_engine_registry = (
        engine_registry if engine_registry is not None else ENGINE_RUNTIME_REGISTRY
    )
    runtime_process_manager = (
        process_manager
        if process_manager is not None
        else build_process_manager(
            process_registry=runtime_process_registry,
            streamlit_scripts=runtime_engine_registry.streamlit_scripts(),
        )
    )
    runtime_forum = (
        forum_runtime
        if forum_runtime is not None
        else build_forum_runtime(process_registry=runtime_process_manager.process_registry)
    )

    return SystemLifecycleDependencies(
        streamlit_scripts=runtime_process_manager.streamlit_scripts,
        start_streamlit_app=runtime_process_manager.start_streamlit_app,
        wait_for_app_startup=runtime_process_manager.wait_for_app_startup,
        check_app_status=runtime_process_manager.check_app_status,
        start_forum_engine=runtime_forum.start_engine,
        stop_forum_engine=runtime_forum.stop_engine,
        cleanup_processes_concurrent=runtime_process_manager.cleanup_processes_concurrent,
        describe_running_children=runtime_process_manager.describe_running_children,
        log_shutdown_step=log_shutdown_step,
    )


class SystemLifecycleService:
    """Coordinate runtime startup, shutdown, and forced process exit."""

    def __init__(
        self,
        *,
        log_dir: Path,
        emit_output: EmitOutput,
        stop_socketio: SocketStopper,
        process_registry: ProcessRuntimeRegistry | None = None,
        system_state_registry: SystemStateRegistry | None = None,
        dependencies: SystemLifecycleDependencies | None = None,
    ) -> None:
        self._log_dir = log_dir
        self._emit_output = emit_output
        self._stop_socketio = stop_socketio
        self._process_registry = (
            process_registry if process_registry is not None else PROCESS_RUNTIME_REGISTRY
        )
        self._system_state_registry = (
            system_state_registry if system_state_registry is not None else SYSTEM_STATE_REGISTRY
        )
        self._dependencies = (
            dependencies
            if dependencies is not None
            else build_default_system_lifecycle_dependencies(
                process_registry=self._process_registry,
            )
        )

    def initialize_system_components(self) -> tuple[bool, list[str], list[str]]:
        """Start the Streamlit engines and forum runtime."""
        logs: list[str] = []
        errors: list[str] = []

        for app_name in ("insight", "media", "query"):
            script_path = self._dependencies.streamlit_scripts.get(app_name)
            if not script_path:
                message = f"{app_name} missing startup script configuration"
                logger.warning(message)
                logs.append(message)
                errors.append(message)
                continue

            success, message = self._dependencies.start_streamlit_app(
                app_name,
                script_path,
                self._process_registry.get_port(app_name),
                log_dir=self._log_dir,
                emit_output=self._emit_output,
            )
            logs.append(f"{app_name}: {message}")
            if not success:
                errors.append(f"{app_name}: {message}")
                continue

            startup_success, startup_message = self._dependencies.wait_for_app_startup(app_name, 15)
            logs.append(f"{app_name} startup check: {startup_message}")
            if not startup_success:
                errors.append(f"{app_name}: {startup_message}")

        try:
            forum_started = self._dependencies.start_forum_engine(emit_output=self._emit_output)
            if forum_started:
                logs.append("forum: ForumEngine started")
            else:
                errors.append("forum: ForumEngine returned an unknown startup result")
        except Exception as exc:  # pragma: no cover
            logger.exception("ForumEngine startup failed during system initialization")
            errors.append(f"forum: {exc}")

        self._dependencies.check_app_status()
        return not errors, logs, errors

    def start_system(self) -> tuple[dict, int]:
        """Start all runtime components if the system is not already starting."""
        allowed, message = self._system_state_registry.prepare_start()
        if not allowed:
            return {"success": False, "message": message}, 400

        try:
            success, logs, errors = self.initialize_system_components()
            if success:
                self._system_state_registry.set_state(started=True)
                return {"success": True, "message": "system started successfully", "logs": logs}, 200

            self._system_state_registry.set_state(started=False)
            return {
                "success": False,
                "message": "system startup failed",
                "logs": logs,
                "errors": errors,
            }, 500
        except Exception as exc:  # pragma: no cover
            logger.exception("Unexpected error while starting the system")
            self._system_state_registry.set_state(started=False)
            return {"success": False, "message": f"system startup error: {exc}"}, 500
        finally:
            self._system_state_registry.set_state(starting=False)

    def shutdown_system(self, *, cleanup_timeout: float = 6.0) -> tuple[dict, int]:
        """Request an asynchronous shutdown of all managed runtime components."""
        state = self._system_state_registry.snapshot()
        if state["starting"]:
            return {"success": False, "message": "system is still starting"}, 400

        target_ports = self._process_registry.active_ports()

        if not self._system_state_registry.request_shutdown():
            running = self._dependencies.describe_running_children()
            detail = "shutdown already requested; waiting for tracked processes"
            if running:
                detail = f"{detail}: {', '.join(running)}"
            if target_ports:
                detail = f"{detail} ({', '.join(target_ports)})"
            return {"success": True, "message": detail, "ports": target_ports}, 200

        running = self._dependencies.describe_running_children()
        if running:
            self._dependencies.log_shutdown_step(
                "Starting system shutdown; waiting for processes: " + ", ".join(running)
            )
        else:
            self._dependencies.log_shutdown_step(
                "Starting system shutdown; no tracked child processes detected"
            )

        try:
            self._system_state_registry.set_state(started=False, starting=False)
            self.start_async_shutdown(cleanup_timeout=cleanup_timeout)
            message = "shutdown requested; stopping tracked processes"
            if running:
                message = f"{message}: {', '.join(running)}"
            if target_ports:
                message = f"{message} ({', '.join(target_ports)})"
            return {"success": True, "message": message, "ports": target_ports}, 200
        except Exception as exc:  # pragma: no cover
            logger.exception("Unexpected error while shutting down the system")
            return {"success": False, "message": f"system shutdown error: {exc}"}, 500

    def schedule_server_shutdown(self, delay_seconds: float = 0.1) -> None:
        """Stop Socket.IO and force-exit the host process after cleanup."""

        def _shutdown() -> None:
            time.sleep(delay_seconds)
            try:
                self._stop_socketio()
            except Exception as exc:  # pragma: no cover
                logger.warning(f"SocketIO stop raised during shutdown: {exc}")

            self._dependencies.log_shutdown_step(
                "SocketIO stop requested; exiting the host process"
            )
            os._exit(0)

        threading.Thread(target=_shutdown, daemon=True).start()

    def start_async_shutdown(self, cleanup_timeout: float = 3.0) -> None:
        """Run process cleanup in a background thread and then exit."""
        self._dependencies.log_shutdown_step(
            f"Received shutdown request; starting async cleanup (timeout={cleanup_timeout}s)"
        )

        def _force_exit() -> None:
            self._dependencies.log_shutdown_step("Shutdown timed out; forcing process exit")
            os._exit(0)

        hard_timeout = cleanup_timeout + 2.0
        force_timer = threading.Timer(hard_timeout, _force_exit)
        force_timer.daemon = True
        force_timer.start()

        def _cleanup_and_exit() -> None:
            try:
                self._dependencies.cleanup_processes_concurrent(
                    cleanup=build_process_cleanup_port(
                        stop_forum_engine=self._dependencies.stop_forum_engine,
                        system_state_registry=self._system_state_registry,
                    ),
                    timeout=cleanup_timeout,
                )
            except Exception as exc:  # pragma: no cover
                logger.exception(f"Shutdown cleanup raised: {exc}")
            finally:
                self._dependencies.log_shutdown_step(
                    "Cleanup thread finished; scheduling host process exit"
                )
                self.schedule_server_shutdown(0.05)

        threading.Thread(target=_cleanup_and_exit, daemon=True).start()


__all__ = [
    "build_process_cleanup_port",
    "SystemLifecycleDependencies",
    "SystemLifecycleService",
    "build_default_system_lifecycle_dependencies",
]