template_selection_node.py 9.53 KB
"""
模板选择节点
根据查询内容和可用模板选择最合适的报告模板
"""

import os
import json
from typing import Dict, Any, List, Optional
from loguru import logger

from .base_node import BaseNode
from ..prompts import SYSTEM_PROMPT_TEMPLATE_SELECTION


class TemplateSelectionNode(BaseNode):
    """模板选择处理节点"""
    
    def __init__(self, llm_client, template_dir: str = "ReportEngine/report_template"):
        """
        初始化模板选择节点
        
        Args:
            llm_client: LLM客户端
            template_dir: 模板目录路径
        """
        super().__init__(llm_client, "TemplateSelectionNode")
        self.template_dir = template_dir
        
    def run(self, input_data: Dict[str, Any], **kwargs) -> Dict[str, Any]:
        """
        执行模板选择
        
        Args:
            input_data: 包含查询和报告内容的字典
                - query: 原始查询
                - reports: 三个子agent的报告列表
                - forum_logs: 论坛日志内容
                
        Returns:
            选择的模板信息
        """
        logger.info("开始模板选择...")
        
        query = input_data.get('query', '')
        reports = input_data.get('reports', [])
        forum_logs = input_data.get('forum_logs', '')
        
        # 获取可用模板
        available_templates = self._get_available_templates()
        
        if not available_templates:
            logger.info("未找到预设模板,使用内置默认模板")
            return self._get_fallback_template()
        
        # 使用LLM进行模板选择
        try:
            llm_result = self._llm_template_selection(query, reports, forum_logs, available_templates)
            if llm_result:
                return llm_result
        except Exception as e:
            logger.exception(f"LLM模板选择失败: {str(e)}")
        
        # 如果LLM选择失败,使用备选方案
        return self._get_fallback_template()
    

    
    def _llm_template_selection(self, query: str, reports: List[Any], forum_logs: str, 
                              available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
        """使用LLM进行模板选择"""
        logger.info("尝试使用LLM进行模板选择...")
        
        # 构建模板列表
        template_list = "\n".join([f"- {t['name']}: {t['description']}" for t in available_templates])
        
        # 构建报告内容摘要
        reports_summary = ""
        if reports:
            reports_summary = "\n\n=== 分析引擎报告内容 ===\n"
            for i, report in enumerate(reports, 1):
                # 获取报告内容,支持不同的数据格式
                if isinstance(report, dict):
                    content = report.get('content', str(report))
                elif hasattr(report, 'content'):
                    content = report.content
                else:
                    content = str(report)
                
                # 截断过长的内容,保留前1000个字符
                if len(content) > 1000:
                    content = content[:1000] + "...(内容已截断)"
                
                reports_summary += f"\n报告{i}内容:\n{content}\n"
        
        # 构建论坛日志摘要
        forum_summary = ""
        if forum_logs and forum_logs.strip():
            forum_summary = "\n\n=== 三个引擎的讨论内容 ===\n"
            # 截断过长的日志内容,保留前800个字符
            if len(forum_logs) > 800:
                forum_content = forum_logs[:800] + "...(讨论内容已截断)"
            else:
                forum_content = forum_logs
            forum_summary += forum_content
        
        user_message = f"""查询内容: {query}

报告数量: {len(reports)} 个分析引擎报告
论坛日志: {'有' if forum_logs else '无'}
{reports_summary}{forum_summary}

可用模板:
{template_list}

请根据查询内容、报告内容和论坛日志的具体情况,选择最合适的模板。"""
        
        # 调用LLM
        response = self.llm_client.stream_invoke_to_string(SYSTEM_PROMPT_TEMPLATE_SELECTION, user_message)
        
        # 检查响应是否为空
        if not response or not response.strip():
            logger.error("LLM返回空响应")
            return None
        
        logger.info(f"LLM原始响应: {response}")
        
        # 尝试解析JSON响应
        try:
            # 清理响应文本
            cleaned_response = self._clean_llm_response(response)
            result = json.loads(cleaned_response)
            
            # 验证选择的模板是否存在
            selected_template_name = result.get('template_name', '')
            for template in available_templates:
                if template['name'] == selected_template_name or selected_template_name in template['name']:
                    logger.info(f"LLM选择模板: {selected_template_name}")
                    return {
                        'template_name': template['name'],
                        'template_content': template['content'],
                        'selection_reason': result.get('selection_reason', 'LLM智能选择')
                    }
            
            logger.error(f"LLM选择的模板不存在: {selected_template_name}")
            return None
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析失败: {str(e)}")
            # 尝试从文本响应中提取模板信息
            return self._extract_template_from_text(response, available_templates)
    
    def _clean_llm_response(self, response: str) -> str:
        """清理LLM响应"""
        # 移除可能的markdown代码块标记
        if '```json' in response:
            response = response.split('```json')[1].split('```')[0]
        elif '```' in response:
            response = response.split('```')[1].split('```')[0]
        
        # 移除前后空白
        response = response.strip()
        
        return response
    
    def _extract_template_from_text(self, response: str, available_templates: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
        """从文本响应中提取模板信息"""
        logger.info("尝试从文本响应中提取模板信息")
        
        # 查找响应中是否包含模板名称
        for template in available_templates:
            template_name_variants = [
                template['name'],
                template['name'].replace('.md', ''),
                template['name'].replace('模板', ''),
            ]
            
            for variant in template_name_variants:
                if variant in response:
                    logger.info(f"在响应中找到模板: {template['name']}")
                    return {
                        'template_name': template['name'],
                        'template_content': template['content'],
                        'selection_reason': '从文本响应中提取'
                    }
        
        return None
    
    def _get_available_templates(self) -> List[Dict[str, Any]]:
        """获取可用的模板列表"""
        templates = []
        
        if not os.path.exists(self.template_dir):
            logger.error(f"模板目录不存在: {self.template_dir}")
            return templates
        
        # 查找所有markdown模板文件
        for filename in os.listdir(self.template_dir):
            if filename.endswith('.md'):
                template_path = os.path.join(self.template_dir, filename)
                try:
                    with open(template_path, 'r', encoding='utf-8') as f:
                        content = f.read()
                    
                    template_name = filename.replace('.md', '')
                    description = self._extract_template_description(template_name)
                    
                    templates.append({
                        'name': template_name,
                        'path': template_path,
                        'content': content,
                        'description': description
                    })
                except Exception as e:
                    logger.exception(f"读取模板文件失败 {filename}: {str(e)}")
        
        return templates
    
    def _extract_template_description(self, template_name: str) -> str:
        """根据模板名称生成描述"""
        if '企业品牌' in template_name:
            return "适用于企业品牌声誉和形象分析"
        elif '市场竞争' in template_name:
            return "适用于市场竞争格局和对手分析"
        elif '日常' in template_name or '定期' in template_name:
            return "适用于日常监测和定期汇报"
        elif '政策' in template_name or '行业' in template_name:
            return "适用于政策影响和行业动态分析"
        elif '热点' in template_name or '社会' in template_name:
            return "适用于社会热点和公共事件分析"
        elif '突发' in template_name or '危机' in template_name:
            return "适用于突发事件和危机公关"
        
        return "通用报告模板"
    

    
    def _get_fallback_template(self) -> Dict[str, Any]:
        """获取备用默认模板(空模板,让LLM自行发挥)"""
        logger.info("未找到合适模板,使用空模板让LLM自行发挥")
        
        return {
            'template_name': '自由发挥模板',
            'template_content': '',
            'selection_reason': '未找到合适的预设模板,让LLM根据内容自行设计报告结构'
        }