forum_runtime.py 11 KB
"""Forum engine runtime helpers and object-oriented adapter."""

from __future__ import annotations

from pathlib import Path
from typing import Any, Callable

from loguru import logger

from apps.web_api.runtime.log_stream import read_log_from_file
from apps.web_api.runtime.process_registry import PROCESS_RUNTIME_REGISTRY, ProcessRuntimeRegistry
from services.shared.models import EngineExecutionError, EngineResult

ForumEmitOutput = Callable[[str, dict[str, Any]], None]
ForumMonitorStarter = Callable[[], bool]
ForumMonitorStopper = Callable[[], None]
ForumLogReader = Callable[[Path, str, int | None], list[str]]

_FORUM_ENGINE_NAME = "forum"
_FORUM_LOG_FILE_NAME = "forum.log"


def _default_start_forum_monitoring() -> bool:
    from services.engines.forum.monitor import start_forum_monitoring

    return start_forum_monitoring()


def _default_stop_forum_monitoring() -> None:
    from services.engines.forum.monitor import stop_forum_monitoring

    stop_forum_monitoring()


class ForumRuntime:
    """Object-oriented wrapper around forum runtime operations."""

    def __init__(
        self,
        *,
        process_registry: ProcessRuntimeRegistry | None = None,
        read_log: ForumLogReader = read_log_from_file,
        start_monitoring: ForumMonitorStarter = _default_start_forum_monitoring,
        stop_monitoring: ForumMonitorStopper = _default_stop_forum_monitoring,
    ) -> None:
        self._process_registry = process_registry
        self._read_log = read_log
        self._start_monitoring = start_monitoring
        self._stop_monitoring = stop_monitoring

    @property
    def process_registry(self) -> ProcessRuntimeRegistry:
        return self._process_registry if self._process_registry is not None else PROCESS_RUNTIME_REGISTRY

    def start_engine(self, *, emit_output: ForumEmitOutput | None = None) -> bool:
        try:
            logger.info("ForumEngine: starting forum monitoring")
            success = self._start_monitoring()
            if not success:
                raise RuntimeError("ForumEngine failed to start")

            self.process_registry.set_status(_FORUM_ENGINE_NAME, "running")
            if emit_output is not None:
                emit_output(
                    "console_output",
                    {
                        "app": _FORUM_ENGINE_NAME,
                        "line": "[SYSTEM] ForumEngine started",
                    },
                )
            return True
        except Exception as exc:
            self.process_registry.set_status(_FORUM_ENGINE_NAME, "error")
            logger.exception(f"ForumEngine failed to start: {exc}")
            raise

    def stop_engine(self, *, emit_output: ForumEmitOutput | None = None) -> None:
        try:
            logger.info("ForumEngine: stopping forum monitoring")
            self._stop_monitoring()
            self.process_registry.set_status(_FORUM_ENGINE_NAME, "stopped")
            if emit_output is not None:
                emit_output(
                    "console_output",
                    {
                        "app": _FORUM_ENGINE_NAME,
                        "line": "[SYSTEM] ForumEngine stopped",
                    },
                )
            logger.info("ForumEngine: forum monitoring stopped")
        except Exception as exc:
            self.process_registry.set_status(_FORUM_ENGINE_NAME, "error")
            logger.exception(f"ForumEngine failed to stop: {exc}")
            raise

    def get_output(self, *, log_dir: Path) -> dict[str, Any]:
        lines = self._read_log(log_dir, _FORUM_ENGINE_NAME)
        parsed_messages = self._parse_lines(lines)
        return {
            "success": True,
            "output": lines,
            "total_lines": len(lines),
            "engine_result": self._build_engine_result(
                log_dir=log_dir,
                lines=lines,
                parsed_messages=parsed_messages,
            ),
        }

    def get_log_payload(self, *, log_dir: Path) -> dict[str, Any]:
        lines = self._read_log(log_dir, _FORUM_ENGINE_NAME)
        parsed_messages = self._parse_lines(lines)
        return {
            "success": True,
            "log_lines": lines,
            "parsed_messages": parsed_messages,
            "total_lines": len(lines),
            "engine_result": self._build_engine_result(
                log_dir=log_dir,
                lines=lines,
                parsed_messages=parsed_messages,
            ),
        }

    def get_log_history(
        self,
        *,
        log_dir: Path,
        start_position: int = 0,
        max_lines: int = 1000,
    ) -> dict[str, Any]:
        forum_log_file = log_dir / _FORUM_LOG_FILE_NAME
        if not forum_log_file.exists():
            return {
                "success": True,
                "log_lines": [],
                "position": 0,
                "has_more": False,
                "engine_result": self._build_engine_result(log_dir=log_dir, lines=[]),
            }

        with open(forum_log_file, "r", encoding="utf-8", errors="ignore") as file:
            file.seek(start_position)
            lines: list[str] = []
            line_count = 0

            while line_count < max_lines:
                line = file.readline()
                if not line:
                    break
                normalized = line.rstrip("\n\r")
                if normalized.strip():
                    lines.append(normalized)
                    line_count += 1

            current_position = file.tell()
            file.seek(0, 2)
            end_position = file.tell()

        parsed_messages = self._parse_lines(lines)
        return {
            "success": True,
            "log_lines": lines,
            "position": current_position,
            "has_more": current_position < end_position,
            "engine_result": self._build_engine_result(
                log_dir=log_dir,
                lines=lines,
                parsed_messages=parsed_messages,
            ),
        }

    def _parse_lines(self, lines: list[str]) -> list[dict[str, str]]:
        return [parsed for parsed in (parse_forum_log_line(line) for line in lines) if parsed is not None]

    def _build_engine_result(
        self,
        *,
        log_dir: Path,
        lines: list[str],
        parsed_messages: list[dict[str, str]] | None = None,
    ) -> dict[str, Any]:
        parsed = list(parsed_messages) if parsed_messages is not None else self._parse_lines(lines)
        host_summary = self._extract_host_summary(parsed)
        runtime_status = self.process_registry.get_status(_FORUM_ENGINE_NAME).strip().lower()
        status = self._resolve_engine_status(runtime_status=runtime_status, host_summary=host_summary, parsed_messages=parsed)
        success = status != "failed"
        summary = self._resolve_engine_summary(
            status=status,
            parsed_messages=parsed,
            host_summary=host_summary,
        )

        participants = sorted({message["source"] for message in parsed if message.get("source")})
        artifacts: dict[str, Any] = {
            "parsed_messages": parsed,
            "participants": participants,
        }
        if host_summary:
            artifacts["host_summary"] = host_summary

        metrics = {
            "log_line_count": len(lines),
            "parsed_message_count": len(parsed),
            "participant_count": len(participants),
            "host_summary_count": 1 if host_summary else 0,
        }

        error = None
        if not success:
            error = EngineExecutionError(
                code="forum_runtime_error",
                message=summary,
                retryable=False,
                details={"runtime_status": runtime_status or "unknown"},
            )

        return EngineResult(
            engine_name=_FORUM_ENGINE_NAME,
            status=status,
            success=success,
            summary=summary,
            artifacts=artifacts,
            metrics=metrics,
            logs_ref=str(log_dir / _FORUM_LOG_FILE_NAME),
            error=error,
        ).to_runtime_payload()

    @staticmethod
    def _extract_host_summary(parsed_messages: list[dict[str, str]]) -> str:
        for message in reversed(parsed_messages):
            if message.get("source", "").strip().upper() != "HOST":
                continue
            content = str(message.get("content") or "").strip()
            if content:
                return content
        return ""

    @staticmethod
    def _resolve_engine_status(
        *,
        runtime_status: str,
        host_summary: str,
        parsed_messages: list[dict[str, str]],
    ) -> str:
        if runtime_status == "error":
            return "failed"
        if host_summary:
            return "completed"
        if runtime_status == "running":
            return "running"
        if runtime_status in {"stopped", "idle", "pending"}:
            return runtime_status
        if parsed_messages:
            return "running"
        return "idle"

    @staticmethod
    def _resolve_engine_summary(
        *,
        status: str,
        parsed_messages: list[dict[str, str]],
        host_summary: str,
    ) -> str:
        if host_summary:
            return host_summary
        if parsed_messages:
            latest_content = str(parsed_messages[-1].get("content") or "").strip()
            if latest_content:
                return latest_content
        if status == "running":
            return "Forum discussion is running"
        if status == "failed":
            return "Forum runtime reported an error"
        if status == "stopped":
            return "Forum discussion is stopped"
        return "Forum discussion has no log output yet"


def parse_forum_log_line(line: str) -> dict[str, str] | None:
    normalized = line.strip()
    if not normalized:
        return None

    if normalized.startswith("["):
        try:
            timestamp_end = normalized.index("]")
            timestamp = normalized[1:timestamp_end]
            remaining = normalized[timestamp_end + 1 :].strip()
            if remaining.startswith("["):
                source_end = remaining.index("]")
                source = remaining[1:source_end]
                content = remaining[source_end + 1 :].strip()
                return {
                    "timestamp": timestamp,
                    "source": source,
                    "content": content.replace("\\n", "\n").replace("\\r", "\r"),
                }
            return {
                "timestamp": timestamp,
                "source": "SYSTEM",
                "content": remaining.replace("\\n", "\n").replace("\\r", "\r"),
            }
        except ValueError:
            pass

    return {
        "timestamp": "",
        "source": "UNKNOWN",
        "content": normalized.replace("\\n", "\n").replace("\\r", "\r"),
    }


def build_forum_runtime(
    *,
    process_registry: ProcessRuntimeRegistry | None = None,
) -> ForumRuntime:
    """Build a forum runtime bound to the provided process registry."""

    return ForumRuntime(process_registry=process_registry)


__all__ = [
    "ForumRuntime",
    "build_forum_runtime",
    "parse_forum_log_line",
]