report_engine_only.py 12.3 KB
#!/usr/bin/env python
"""
Report Engine 命令行版本。

该工具不依赖前端,直接读取三大引擎最新 Markdown 报告并调用 Report Engine
生成综合 HTML 报告,可选继续导出 PDF 与 Markdown。
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional

from loguru import logger

from utils.runtime_paths import (
    FINAL_REPORTS_DIR,
    INSIGHT_REPORTS_DIR,
    LOGS_DIR,
    MEDIA_REPORTS_DIR,
    QUERY_REPORTS_DIR,
    ensure_runtime_dirs,
)


PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

CHAPTER_OUTPUT_DIR = FINAL_REPORTS_DIR / "chapters"
IR_OUTPUT_DIR = FINAL_REPORTS_DIR / "ir"
PDF_OUTPUT_DIR = FINAL_REPORTS_DIR / "pdf"
MARKDOWN_OUTPUT_DIR = FINAL_REPORTS_DIR / "md"

os.environ.setdefault("REPORT_ENGINE_OUTPUT_DIR", str(FINAL_REPORTS_DIR))
os.environ.setdefault("REPORT_ENGINE_CHAPTER_OUTPUT_DIR", str(CHAPTER_OUTPUT_DIR))
os.environ.setdefault("REPORT_ENGINE_DOCUMENT_IR_OUTPUT_DIR", str(IR_OUTPUT_DIR))
os.environ.setdefault("REPORT_ENGINE_LOG_FILE", str(LOGS_DIR / "report.log"))


def setup_logger(verbose: bool = False) -> None:
    logger.remove()
    logger.add(
        sys.stdout,
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
        level="DEBUG" if verbose else "INFO",
    )


def sanitize_topic(value: str) -> str:
    safe = "".join(char for char in value if char.isalnum() or char in (" ", "-", "_")).rstrip()
    safe = safe.replace(" ", "_")
    return safe[:30] or "report"


def check_dependencies() -> tuple[bool, Optional[str]]:
    logger.info("=" * 70)
    logger.info("步骤 1/4: 检查 PDF 依赖")
    logger.info("=" * 70)

    try:
        from services.engines.report.utils.dependency_check import check_pango_available

        is_available, message = check_pango_available()
        if is_available:
            logger.success("PDF 依赖检查通过,将同时生成 HTML 和 PDF")
        else:
            logger.warning("PDF 依赖缺失,将跳过 PDF,仅保留 HTML/Markdown")
            if message:
                logger.info(message)
        return is_available, message
    except Exception as exc:
        logger.error(f"依赖检查失败: {exc}")
        return False, str(exc)


def get_latest_engine_reports() -> dict[str, Path]:
    logger.info("\n" + "=" * 70)
    logger.info("步骤 2/4: 获取最新分析引擎报告")
    logger.info("=" * 70)

    ensure_runtime_dirs()
    directories = {
        "insight": INSIGHT_REPORTS_DIR,
        "media": MEDIA_REPORTS_DIR,
        "query": QUERY_REPORTS_DIR,
    }

    latest_files: dict[str, Path] = {}
    for engine, directory in directories.items():
        if not directory.exists():
            logger.warning(f"{engine.capitalize()} Engine 目录不存在: {directory}")
            continue

        md_files = sorted(directory.glob("*.md"), key=lambda item: item.stat().st_mtime, reverse=True)
        if not md_files:
            logger.warning(f"{engine.capitalize()} Engine 目录中没有找到 .md 文件")
            continue

        latest_files[engine] = md_files[0]
        logger.info(f"找到 {engine.capitalize()} Engine 最新报告: {md_files[0].name}")

    if not latest_files:
        logger.error("未找到任何引擎报告,请先运行分析引擎生成报告")
        raise SystemExit(1)

    logger.info(f"\n共找到 {len(latest_files)} 个引擎的最新报告")
    return latest_files


def confirm_file_selection(latest_files: dict[str, Path]) -> bool:
    logger.info("\n" + "=" * 70)
    logger.info("请确认以下文件选择")
    logger.info("=" * 70)

    for engine, file_path in latest_files.items():
        mtime = datetime.fromtimestamp(file_path.stat().st_mtime).strftime("%Y-%m-%d %H:%M:%S")
        logger.info(f"{engine.capitalize()} Engine:")
        logger.info(f"  文件名: {file_path.name}")
        logger.info(f"  路径: {file_path}")
        logger.info(f"  修改时间: {mtime}")

    try:
        response = input("是否使用以上文件生成报告? [Y/n]: ").strip().lower()
    except (KeyboardInterrupt, EOFError):
        logger.warning("\n用户取消操作")
        return False

    if response in ("", "y", "yes"):
        logger.success("用户已确认,继续执行")
        return True

    logger.warning("用户取消操作")
    return False


def load_engine_reports(latest_files: dict[str, Path]) -> list[str]:
    reports: list[str] = []
    for engine, file_path in latest_files.items():
        try:
            content = file_path.read_text(encoding="utf-8")
            reports.append(content)
            logger.debug(f"已加载 {engine} 报告,长度 {len(content)} 字符")
        except Exception as exc:
            logger.error(f"加载 {engine} 报告失败: {exc}")
    return reports


def extract_query_from_reports(latest_files: dict[str, Path]) -> str:
    for file_path in latest_files.values():
        parts = file_path.stem.split("_")
        if len(parts) >= 3:
            topic = "_".join(parts[1:-1])
            if topic:
                return topic
        if len(parts) == 2 and parts[1]:
            return parts[1]
    return "综合分析报告"


def generate_report(reports: list[str], query: str) -> dict[str, Any]:
    logger.info("\n" + "=" * 70)
    logger.info("步骤 3/4: 生成综合报告")
    logger.info("=" * 70)
    logger.info(f"报告主题: {query}")
    logger.info(f"输入报告数: {len(reports)}")

    from services.engines.report.agent import ReportAgent

    def stream_handler(event_type: str, payload: dict[str, Any]) -> None:
        if event_type == "stage":
            stage = payload.get("stage", "")
            if stage == "agent_start":
                logger.info(f"开始生成报告: {payload.get('report_id', '')}")
            elif stage == "template_selected":
                logger.info(f"已选择模板: {payload.get('template', '')}")
            elif stage == "chapters_compiled":
                logger.info(f"章节生成完成,共 {payload.get('chapter_count', 0)} 个章节")
            elif stage == "html_rendered":
                logger.info("HTML 渲染完成")
            elif stage == "report_saved":
                logger.info("报告文件已保存")
        elif event_type == "chapter_status":
            title = payload.get("title", "")
            status = payload.get("status", "")
            if status == "generating":
                logger.info(f"正在生成章节: {title}")
            elif status == "completed":
                logger.success(f"章节完成: {title}")
        elif event_type == "error":
            logger.error(payload.get("message", "未知错误"))

    agent = ReportAgent()
    result = agent.generate_report(
        query=query,
        reports=reports,
        forum_logs="",
        custom_template="",
        save_report=True,
        stream_handler=stream_handler,
    )
    logger.success("综合报告生成成功")
    return result


def load_document_ir(path: str | Path) -> dict[str, Any]:
    ir_path = Path(path)
    with ir_path.open("r", encoding="utf-8") as file:
        return json.load(file)


def save_pdf(document_ir_path: str, query: str) -> Optional[str]:
    logger.info("\n正在生成 PDF 文件...")

    try:
        from services.engines.report.renderers import PDFRenderer

        document_ir = load_document_ir(document_ir_path)
        renderer = PDFRenderer()

        ensure_runtime_dirs()
        PDF_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        filename = f"final_report_{sanitize_topic(query)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
        pdf_path = PDF_OUTPUT_DIR / filename

        result_path = renderer.render_to_pdf(
            document_ir,
            pdf_path,
            optimize_layout=True,
            ir_file_path=document_ir_path,
        )

        size_mb = result_path.stat().st_size / (1024 * 1024)
        logger.success(f"PDF 已保存: {result_path}")
        logger.info(f"文件大小: {size_mb:.2f} MB")
        return str(result_path)
    except Exception as exc:
        logger.exception(f"PDF 生成失败: {exc}")
        return None


def save_markdown(document_ir_path: str, query: str) -> Optional[str]:
    logger.info("\n正在生成 Markdown 文件...")

    try:
        from services.engines.report.renderers import MarkdownRenderer

        document_ir = load_document_ir(document_ir_path)
        renderer = MarkdownRenderer()
        markdown_content = renderer.render(document_ir, ir_file_path=document_ir_path)

        ensure_runtime_dirs()
        MARKDOWN_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        filename = f"final_report_{sanitize_topic(query)}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
        md_path = MARKDOWN_OUTPUT_DIR / filename
        md_path.write_text(markdown_content, encoding="utf-8")

        size_kb = md_path.stat().st_size / 1024
        logger.success(f"Markdown 已保存: {md_path}")
        logger.info(f"文件大小: {size_kb:.1f} KB")
        return str(md_path)
    except Exception as exc:
        logger.exception(f"Markdown 生成失败: {exc}")
        return None


def parse_arguments(argv: Optional[list[str]] = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Report Engine 命令行版本",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=(
            "示例:\n"
            "  python -m tools.reports.report_engine_only\n"
            "  python -m tools.reports.report_engine_only --query \"博物馆品牌分析\"\n"
            "  python -m tools.reports.report_engine_only --skip-pdf --verbose\n"
        ),
    )
    parser.add_argument("--query", type=str, default=None, help="指定报告主题,默认从文件名自动提取")
    parser.add_argument("--skip-pdf", action="store_true", help="跳过 PDF 生成")
    parser.add_argument("--skip-markdown", action="store_true", help="跳过 Markdown 生成")
    parser.add_argument("--verbose", action="store_true", help="输出详细日志")
    return parser.parse_args(argv)


def main(argv: Optional[list[str]] = None) -> int:
    args = parse_arguments(argv)
    setup_logger(verbose=args.verbose)
    ensure_runtime_dirs()

    logger.info("Report Engine 命令行工具启动")

    pdf_available, _ = check_dependencies()
    if args.skip_pdf:
        pdf_available = False
        logger.info("用户指定跳过 PDF 生成")

    markdown_enabled = not args.skip_markdown
    if not markdown_enabled:
        logger.info("用户指定跳过 Markdown 生成")

    latest_files = get_latest_engine_reports()
    if not confirm_file_selection(latest_files):
        return 0

    reports = load_engine_reports(latest_files)
    if not reports:
        logger.error("未能加载任何报告内容")
        return 1

    query = args.query or extract_query_from_reports(latest_files)
    result = generate_report(reports, query)

    logger.info("\n" + "=" * 70)
    logger.info("步骤 4/4: 保存生成文件")
    logger.info("=" * 70)

    html_path = result.get("report_filepath", "")
    ir_path = result.get("ir_filepath", "")
    pdf_path = None
    markdown_path = None

    if html_path:
        logger.success(f"HTML 已保存: {result.get('report_relative_path', html_path)}")

    if pdf_available:
        if ir_path and Path(ir_path).exists():
            pdf_path = save_pdf(ir_path, query)
        else:
            logger.warning("未找到 IR 文件,无法生成 PDF")
    else:
        logger.info("已跳过 PDF 生成")

    if markdown_enabled:
        if ir_path and Path(ir_path).exists():
            markdown_path = save_markdown(ir_path, query)
        else:
            logger.warning("未找到 IR 文件,无法生成 Markdown")
    else:
        logger.info("已跳过 Markdown 生成")

    logger.info("\n" + "=" * 70)
    logger.success("报告生成完成")
    logger.info("=" * 70)
    logger.info(f"报告 ID: {result.get('report_id', 'N/A')}")
    logger.info(f"HTML 文件: {result.get('report_relative_path', 'N/A')}")
    logger.info(f"PDF 文件: {pdf_path or '已跳过或生成失败'}")
    logger.info(f"Markdown 文件: {markdown_path or '已跳过或生成失败'}")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())