log_stream.py 4.39 KB
"""运行时日志流与进程输出转发。"""

from __future__ import annotations

import time
from datetime import datetime
from pathlib import Path
from typing import Callable

from loguru import logger


def write_log_to_file(log_dir: Path, app_name: str, line: str) -> None:
    """Append a line to the app log file."""
    try:
        log_file_path = log_dir / f"{app_name}.log"
        with open(log_file_path, "a", encoding="utf-8") as file:
            file.write(line + "\n")
            file.flush()
    except Exception as exc:
        logger.error(f"Error writing log for {app_name}: {exc}")


def read_log_from_file(log_dir: Path, app_name: str, tail_lines: int | None = None) -> list[str]:
    """Read log output from a file."""
    try:
        log_file_path = log_dir / f"{app_name}.log"
        if not log_file_path.exists():
            return []

        with open(log_file_path, "r", encoding="utf-8") as file:
            lines = file.readlines()
            normalized = [line.rstrip("\n\r") for line in lines if line.strip()]
            return normalized[-tail_lines:] if tail_lines else normalized
    except Exception as exc:
        logger.exception(f"Error reading log for {app_name}: {exc}")
        return []


def emit_console_output(
    emit_output: Callable[[str, dict], None],
    app_name: str,
    formatted_line: str,
) -> None:
    emit_output(
        "console_output",
        {
            "app": app_name,
            "line": formatted_line,
        },
    )


def append_and_emit_log(
    *,
    log_dir: Path,
    emit_output: Callable[[str, dict], None],
    app_name: str,
    line: str,
) -> None:
    timestamp = datetime.now().strftime("%H:%M:%S")
    formatted_line = f"[{timestamp}] {line}"
    write_log_to_file(log_dir, app_name, formatted_line)
    emit_console_output(emit_output, app_name, formatted_line)


def flush_remaining_output(
    *,
    process,
    log_dir: Path,
    emit_output: Callable[[str, dict], None],
    app_name: str,
) -> None:
    remaining_output = process.stdout.read()
    if not remaining_output:
        return

    lines = remaining_output.decode("utf-8", errors="replace").split("\n")
    for line in lines:
        cleaned = line.strip()
        if cleaned:
            append_and_emit_log(
                log_dir=log_dir,
                emit_output=emit_output,
                app_name=app_name,
                line=cleaned,
            )


def read_process_output(
    *,
    process,
    log_dir: Path,
    emit_output: Callable[[str, dict], None],
    app_name: str,
    poll_interval: float = 0.1,
) -> None:
    """Stream subprocess output into the log file."""
    import select
    import sys

    while True:
        try:
            if process.poll() is not None:
                flush_remaining_output(
                    process=process,
                    log_dir=log_dir,
                    emit_output=emit_output,
                    app_name=app_name,
                )
                break

            if sys.platform == "win32":
                output = process.stdout.readline()
                if output:
                    line = output.decode("utf-8", errors="replace").strip()
                    if line:
                        append_and_emit_log(
                            log_dir=log_dir,
                            emit_output=emit_output,
                            app_name=app_name,
                            line=line,
                        )
                else:
                    time.sleep(poll_interval)
            else:
                ready, _, _ = select.select([process.stdout], [], [], poll_interval)
                if ready:
                    output = process.stdout.readline()
                    if output:
                        line = output.decode("utf-8", errors="replace").strip()
                        if line:
                            append_and_emit_log(
                                log_dir=log_dir,
                                emit_output=emit_output,
                                app_name=app_name,
                                line=line,
                            )
        except Exception as exc:
            error_msg = f"Error reading output for {app_name}: {exc}"
            logger.exception(error_msg)
            write_log_to_file(
                log_dir,
                app_name,
                f"[{datetime.now().strftime('%H:%M:%S')}] {error_msg}",
            )
            break