runtime.py 13.2 KB
"""Runtime wiring helpers for the BettaFish web API."""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable

from flask_socketio import SocketIO

from apps.web_api.bootstrap.search_hooks import SearchHookSource, build_search_hook_bindings
from apps.web_api.interfaces import HttpRouteDependencies, SocketEventDependencies
from apps.web_api.runtime.engine_registry import ENGINE_RUNTIME_REGISTRY, EngineRuntimeRegistry
from apps.web_api.runtime.forum_runtime import ForumRuntime, build_forum_runtime
from apps.web_api.runtime.log_stream import read_log_from_file, write_log_to_file
from apps.web_api.runtime.process_manager import (
    ProcessManager,
    build_process_manager,
)
from apps.web_api.runtime.process_registry import PROCESS_RUNTIME_REGISTRY, ProcessRuntimeRegistry
from apps.web_api.runtime.search_dispatch import (
    build_analysis_service,
    build_search_dispatch_runtime,
    build_search_request_submitter,
    SearchDispatchRuntime,
)
from apps.web_api.runtime.status_service import RuntimeStatusService
from apps.web_api.runtime.system_lifecycle import (
    SystemLifecycleService,
    build_default_system_lifecycle_dependencies,
    build_process_cleanup_port,
)
from apps.web_api.runtime.system_state import SYSTEM_STATE_REGISTRY, SystemStateRegistry
from apps.web_api.runtime.task_runtime_store import TASK_RUNTIME_STORE, TaskRuntimeStore
from backend import research_tasks as research_backend
from services.application.analysis import AnalysisRunQueryService, AnalysisService
from services.application.report import ReportJobQueryService
from services.application.research import ResearchTaskService, ResearchTaskViewService
from services.shared.timeline import TaskTimelineTracker, create_task_timeline_tracker, InMemoryTaskTimelineStore
from utils.runtime_paths import LOGS_DIR, ensure_runtime_dirs

FrontendDevUrlResolver = Callable[[], str | None]
SearchRequestSubmitter = Callable[..., tuple[dict[str, Any], int]]
CleanupHandler = Callable[[], None]


@dataclass(frozen=True)
class RuntimeServices:
    """Shared runtime services used by the web API entrypoint."""

    log_dir: Path
    engine_registry: EngineRuntimeRegistry
    forum_runtime: ForumRuntime
    process_registry: ProcessRuntimeRegistry
    process_manager: ProcessManager
    system_state_registry: SystemStateRegistry
    task_runtime_store: TaskRuntimeStore
    runtime_status_service: RuntimeStatusService
    task_timeline_tracker: TaskTimelineTracker
    research_task_service: ResearchTaskService
    analysis_service: AnalysisService
    search_dispatch_runtime: SearchDispatchRuntime
    system_lifecycle: SystemLifecycleService
    cleanup_handler: CleanupHandler


def _build_cleanup_handler(
    *,
    process_manager: ProcessManager,
    forum_runtime: ForumRuntime,
    system_state_registry: SystemStateRegistry,
) -> CleanupHandler:
    """Build the cleanup callback used by both atexit and keyboard interrupts."""

    def _cleanup_runtime() -> None:
        process_manager.cleanup_processes(
            cleanup=build_process_cleanup_port(
                stop_forum_engine=forum_runtime.stop_engine,
                system_state_registry=system_state_registry,
            )
        )

    return _cleanup_runtime


def _build_status_refresher(
    *,
    process_manager: ProcessManager,
) -> Callable[[], None]:
    """Bind status refresh to the injected process registry."""

    return lambda: process_manager.check_app_status()


def build_http_route_search_request_submitter(
    *,
    runtime_services: RuntimeServices,
    search_hooks: SearchHookSource,
) -> SearchRequestSubmitter:
    """Bind the full search request use case for route-level HTTP adapters."""

    search_hook_bindings = build_search_hook_bindings(search_hooks)
    return build_search_request_submitter(
        research_task_service=runtime_services.research_task_service,
        resolve_search_query=search_hook_bindings.resolve_search_query,
        dispatch_search_request=search_hook_bindings.dispatch_search_request,
        check_app_status=_build_status_refresher(process_manager=runtime_services.process_manager),
        log_dir=runtime_services.log_dir,
        write_log=write_log_to_file,
        process_registry=runtime_services.process_registry,
        analysis_service=runtime_services.analysis_service,
    )


def _resolve_report_query_service() -> ReportJobQueryService | None:
    """Resolve the runtime-wired report query service when the report engine is available."""

    try:
        from services.engines.report.flask_interface import REPORT_QUERY_SERVICE
    except Exception:
        return None
    return REPORT_QUERY_SERVICE


def _build_task_report_resource(report_app_service: Any, **kwargs: Any) -> Any:
    """Resolve task-scoped report resources while tolerating both typed and legacy service seams."""

    typed_builder = getattr(report_app_service, "get_task_report_resource_dto", None)
    if callable(typed_builder):
        return typed_builder(**kwargs)

    payload_builder = getattr(report_app_service, "build_task_report_resource_payload", None)
    if callable(payload_builder):
        return payload_builder(**kwargs)

    if callable(report_app_service):
        return report_app_service(**kwargs)

    raise TypeError("Report task resource builder must be callable or expose a supported builder method.")


def configure_research_route_services(
    *,
    analysis_query_service: AnalysisRunQueryService | None = None,
    report_query_service: ReportJobQueryService | None = None,
) -> None:
    """Inject route-facing research facades from bootstrap/runtime wiring."""

    from backend import research_routes

    research_routes.configure_research_analysis_app_service(
        analysis_query_service if analysis_query_service is not None else AnalysisRunQueryService()
    )
    research_routes.configure_report_app_service(report_query_service)
    research_routes.configure_research_task_view_app_service(
        ResearchTaskViewService(
            task_service=research_routes._ResearchTaskAppServiceProxy(),
            analysis_run_builder=lambda **kwargs: research_routes.RESEARCH_ANALYSIS_APP_SERVICE.get_task_analysis_run_dto(
                research_task_id=str(kwargs.get("research_task_id", "")),
                analysis_run_id=kwargs.get("analysis_run_id"),
            ),
            analysis_resource_builder=lambda **kwargs: research_routes.RESEARCH_ANALYSIS_APP_SERVICE.get_task_analysis_resource_dto(
                **kwargs
            ),
            analysis_runs_builder=lambda **kwargs: research_routes.RESEARCH_ANALYSIS_APP_SERVICE.list_task_analysis_run_dtos(
                research_task_id=str(kwargs.get("research_task_id", ""))
            ),
            crawler_resource_builder=lambda **kwargs: research_routes.RESEARCH_CRAWLER_APP_SERVICE.build_task_crawler_resource_payload(
                **kwargs
            ),
            report_resource_builder=lambda **kwargs: _build_task_report_resource(
                research_routes.RESEARCH_REPORT_APP_SERVICE,
                **kwargs,
            ),
        )
    )


def build_runtime_services(
    socketio: SocketIO,
    *,
    log_dir: Path = LOGS_DIR,
    engine_registry: EngineRuntimeRegistry | None = None,
    forum_runtime: ForumRuntime | None = None,
    process_registry: ProcessRuntimeRegistry | None = None,
    process_manager: ProcessManager | None = None,
    system_state_registry: SystemStateRegistry | None = None,
    task_runtime_store: TaskRuntimeStore | None = None,
    research_task_store: Any | None = None,
) -> RuntimeServices:
    """Create runtime services and registries for the web API."""
    ensure_runtime_dirs()
    runtime_engine_registry = (
        engine_registry if engine_registry is not None else ENGINE_RUNTIME_REGISTRY
    )
    runtime_process_registry = (
        process_registry if process_registry is not None else PROCESS_RUNTIME_REGISTRY
    )
    runtime_system_state_registry = (
        system_state_registry if system_state_registry is not None else SYSTEM_STATE_REGISTRY
    )
    runtime_task_runtime_store = (
        task_runtime_store if task_runtime_store is not None else TASK_RUNTIME_STORE
    )
    if research_task_store is None:
        research_task_store = research_backend.research_task_service
    runtime_process_manager = process_manager if process_manager is not None else build_process_manager(
        process_registry=runtime_process_registry,
        streamlit_scripts=runtime_engine_registry.streamlit_scripts(),
    )
    process_registry = runtime_process_manager.process_registry
    runtime_forum = (
        forum_runtime
        if forum_runtime is not None
        else build_forum_runtime(process_registry=process_registry)
    )
    runtime_status_service = RuntimeStatusService(
        process_registry=process_registry,
        system_state_registry=runtime_system_state_registry,
        refresh_process_status=_build_status_refresher(
            process_manager=runtime_process_manager
        ),
    )

    task_timeline_tracker = create_task_timeline_tracker(
        timeline_store=InMemoryTaskTimelineStore(),
        source="apps.web_api.runtime.research_tasks",
    )
    research_task_service = ResearchTaskService(
        research_task_store,
        timeline_tracker=task_timeline_tracker,
    )
    analysis_service = build_analysis_service(
        research_task_service=research_task_service,
        engine_registry=runtime_engine_registry,
        task_runtime_store=runtime_task_runtime_store,
    )
    analysis_query_service = AnalysisRunQueryService()
    search_dispatch_runtime = build_search_dispatch_runtime(
        analysis_service=analysis_service,
    )
    report_query_service = _resolve_report_query_service()
    system_lifecycle = SystemLifecycleService(
        log_dir=log_dir,
        emit_output=socketio.emit,
        stop_socketio=socketio.stop,
        process_registry=process_registry,
        system_state_registry=runtime_system_state_registry,
        dependencies=build_default_system_lifecycle_dependencies(
            process_manager=runtime_process_manager,
            forum_runtime=runtime_forum,
            process_registry=process_registry,
            engine_registry=runtime_engine_registry,
        ),
    )
    configure_research_route_services(
        analysis_query_service=analysis_query_service,
        report_query_service=report_query_service,
    )

    return RuntimeServices(
        log_dir=log_dir,
        engine_registry=runtime_engine_registry,
        forum_runtime=runtime_forum,
        process_registry=process_registry,
        process_manager=runtime_process_manager,
        system_state_registry=runtime_system_state_registry,
        task_runtime_store=runtime_task_runtime_store,
        runtime_status_service=runtime_status_service,
        task_timeline_tracker=task_timeline_tracker,
        research_task_service=research_task_service,
        analysis_service=analysis_service,
        search_dispatch_runtime=search_dispatch_runtime,
        system_lifecycle=system_lifecycle,
        cleanup_handler=_build_cleanup_handler(
            process_manager=runtime_process_manager,
            forum_runtime=runtime_forum,
            system_state_registry=runtime_system_state_registry,
        ),
    )


def build_http_route_dependencies(
    *,
    runtime_services: RuntimeServices,
    frontend_dev_server_url: FrontendDevUrlResolver,
    socketio: SocketIO,
    submit_search_request: SearchRequestSubmitter,
) -> HttpRouteDependencies:
    """Build the HTTP route dependency bundle from runtime services."""
    return HttpRouteDependencies(
        frontend_dev_server_url=frontend_dev_server_url,
        log_dir=runtime_services.log_dir,
        read_log=read_log_from_file,
        write_log=write_log_to_file,
        socket_emit=socketio.emit,
        get_process_status=runtime_services.runtime_status_service.get_process_status_snapshot,
        get_system_status=runtime_services.runtime_status_service.get_system_status_payload,
        system_lifecycle=runtime_services.system_lifecycle,
        streamlit_scripts=runtime_services.process_manager.streamlit_scripts,
        start_streamlit_app=runtime_services.process_manager.start_streamlit_app,
        wait_for_app_startup=runtime_services.process_manager.wait_for_app_startup,
        stop_streamlit_app=runtime_services.process_manager.stop_streamlit_app,
        start_forum_engine=runtime_services.forum_runtime.start_engine,
        stop_forum_engine=runtime_services.forum_runtime.stop_engine,
        get_forum_output=runtime_services.forum_runtime.get_output,
        get_forum_log_payload=runtime_services.forum_runtime.get_log_payload,
        get_forum_log_history=runtime_services.forum_runtime.get_log_history,
        submit_search_request=submit_search_request,
    )


def build_socket_event_dependencies(
    *,
    runtime_services: RuntimeServices,
) -> SocketEventDependencies:
    """Build the Socket.IO event dependency bundle from runtime services."""
    return SocketEventDependencies(
        get_process_status=runtime_services.runtime_status_service.get_process_status_snapshot,
    )


__all__ = [
    "RuntimeServices",
    "configure_research_route_services",
    "build_http_route_search_request_submitter",
    "build_http_route_dependencies",
    "build_socket_event_dependencies",
    "build_runtime_services",
]