666ghj

Improve forum communication mechanism between agents.

... ... @@ -131,7 +131,12 @@ class LogMonitor:
"已更新段落",
"正在生成",
"开始处理",
"处理完成"
"处理完成",
"已读取HOST发言",
"读取HOST发言失败",
"未找到HOST发言",
"调试输出",
"信息记录"
]
for pattern in exclude_patterns:
... ...
... ... @@ -18,6 +18,17 @@ from ..utils.text_processing import (
format_search_results_for_prompt
)
# 导入论坛读取工具
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
try:
from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt
FORUM_READER_AVAILABLE = True
except ImportError:
FORUM_READER_AVAILABLE = False
print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能")
class FirstSummaryNode(StateMutationNode):
"""根据搜索结果生成段落首次总结的节点"""
... ... @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成首次段落总结")
... ... @@ -208,9 +238,28 @@ class ReflectionSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成反思总结")
... ...
... ... @@ -18,6 +18,17 @@ from ..utils.text_processing import (
format_search_results_for_prompt
)
# 导入论坛读取工具
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
try:
from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt
FORUM_READER_AVAILABLE = True
except ImportError:
FORUM_READER_AVAILABLE = False
print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能")
class FirstSummaryNode(StateMutationNode):
"""根据搜索结果生成段落首次总结的节点"""
... ... @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成首次段落总结")
... ... @@ -212,9 +242,28 @@ class ReflectionSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成反思总结")
... ...
... ... @@ -18,6 +18,17 @@ from ..utils.text_processing import (
format_search_results_for_prompt
)
# 导入论坛读取工具
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
try:
from utils.forum_reader import get_latest_host_speech, format_host_speech_for_prompt
FORUM_READER_AVAILABLE = True
except ImportError:
FORUM_READER_AVAILABLE = False
print("警告: 无法导入forum_reader模块,将跳过HOST发言读取功能")
class FirstSummaryNode(StateMutationNode):
"""根据搜索结果生成段落首次总结的节点"""
... ... @@ -62,9 +73,28 @@ class FirstSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成首次段落总结")
... ... @@ -212,9 +242,28 @@ class ReflectionSummaryNode(StateMutationNode):
# 准备输入数据
if isinstance(input_data, str):
message = input_data
data = json.loads(input_data)
else:
message = json.dumps(input_data, ensure_ascii=False)
data = input_data.copy() if isinstance(input_data, dict) else input_data
# 读取最新的HOST发言(如果可用)
if FORUM_READER_AVAILABLE:
try:
host_speech = get_latest_host_speech()
if host_speech:
# 将HOST发言添加到输入数据中
data['host_speech'] = host_speech
self.log_info(f"已读取HOST发言,长度: {len(host_speech)}字符")
except Exception as e:
self.log_info(f"读取HOST发言失败: {str(e)}")
# 转换为JSON字符串
message = json.dumps(data, ensure_ascii=False)
# 如果有HOST发言,添加到消息前面作为参考
if FORUM_READER_AVAILABLE and 'host_speech' in data and data['host_speech']:
formatted_host = format_host_speech_for_prompt(data['host_speech'])
message = formatted_host + "\n" + message
self.log_info("正在生成反思总结")
... ...
... ... @@ -58,16 +58,18 @@ Say goodbye to traditional data dashboards. In "WeiYu", everything starts with a
### A Complete Analysis Workflow
| Step | Phase Name | Main Operations | Participating Components |
|------|------------|-----------------|-------------------------|
| 1 | User Query | Flask main application receives the query | Flask Main Application |
| 2 | Parallel Launch | Three Agents start working simultaneously | Query Agent, Media Agent, Insight Agent |
| 3 | Preliminary Analysis | Each Agent uses dedicated tools for overview search | Each Agent + Dedicated Toolsets |
| 4 | Strategy Formulation | Develop segmented research strategies based on preliminary results | Internal Decision Modules of Each Agent |
| 5 | In-depth Research | Multi-round search and reflection mechanisms calling respective tools | Each Agent + Reflection Mechanisms |
| 6 | Forum Collaboration | ForumEngine accepts key findings from each Agent and facilitates Agent communication | ForumEngine + All Agents |
| 7 | Result Integration | Report Agent collects all analysis results and forum content | Report Agent |
| 8 | Report Generation | Dynamically select templates and styles, generate final reports through multiple rounds | Report Agent + Template Engine |
| Step | Phase Name | Main Operations | Participating Components | Cycle Nature |
|------|------------|-----------------|-------------------------|--------------|
| 1 | User Query | Flask main application receives the query | Flask Main Application | - |
| 2 | Parallel Launch | Three Agents start working simultaneously | Query Agent, Media Agent, Insight Agent | - |
| 3 | Preliminary Analysis | Each Agent uses dedicated tools for overview search | Each Agent + Dedicated Toolsets | - |
| 4 | Strategy Formulation | Develop segmented research strategies based on preliminary results | Internal Decision Modules of Each Agent | - |
| 5-N | **Iterative Phase** | **Forum Collaboration + In-depth Research** | **ForumEngine + All Agents** | **Multi-round cycles** |
| 5.1 | In-depth Research | Each Agent conducts specialized search guided by forum host | Each Agent + Reflection Mechanisms + Forum Guidance | Each cycle |
| 5.2 | Forum Collaboration | ForumEngine monitors Agent communications and generates host summaries | ForumEngine + LLM Host | Each cycle |
| 5.3 | Communication Integration | Each Agent adjusts research directions based on discussions | Each Agent + forum_reader tool | Each cycle |
| N+1 | Result Integration | Report Agent collects all analysis results and forum content | Report Agent | - |
| N+2 | Report Generation | Dynamically select templates and styles, generate final reports through multiple rounds | Report Agent + Template Engine | - |
### Project Code Structure Tree
... ... @@ -161,6 +163,8 @@ Weibo_PublicOpinion_AnalysisSystem/
├── logs/ # Runtime log directory
├── final_reports/ # Final generated HTML report files
├── utils/ # Common utility functions
│ ├── forum_reader.py # Agent forum communication
│ └── retry_helper.py # Network request retry mechanism tool
├── app.py # Flask main application entry
├── config.py # Global configuration file
└── requirements.txt # Python dependency list
... ...
... ... @@ -58,16 +58,18 @@
### 一次完整分析流程
| 步骤 | 阶段名称 | 主要操作 | 参与组件 |
|------|----------|----------|----------|
| 1 | 用户提问 | Flask主应用接收查询 | Flask主应用 |
| 2 | 并行启动 | 三个Agent同时开始工作 | Query Agent、Media Agent、Insight Agent |
| 3 | 初步分析 | 各Agent使用专属工具进行概览搜索 | 各Agent + 专属工具集 |
| 4 | 策略制定 | 基于初步结果制定分块研究策略 | 各Agent内部决策模块 |
| 5 | 深度研究 | 多轮搜索与反思机制调用各自工具 | 各Agent + 反思机制 |
| 6 | 论坛协作 | ForumEngine接受各Agent关键发现并促进Agent交流 | ForumEngine + 所有Agent |
| 7 | 结果整合 | Report Agent收集所有分析结果和论坛内容 | Report Agent |
| 8 | 报告生成 | 动态选择模板和样式,多轮生成最终报告 | Report Agent + 模板引擎 |
| 步骤 | 阶段名称 | 主要操作 | 参与组件 | 循环特性 |
|------|----------|----------|----------|----------|
| 1 | 用户提问 | Flask主应用接收查询 | Flask主应用 | - |
| 2 | 并行启动 | 三个Agent同时开始工作 | Query Agent、Media Agent、Insight Agent | - |
| 3 | 初步分析 | 各Agent使用专属工具进行概览搜索 | 各Agent + 专属工具集 | - |
| 4 | 策略制定 | 基于初步结果制定分块研究策略 | 各Agent内部决策模块 | - |
| 5-N | **循环阶段** | **论坛协作 + 深度研究** | **ForumEngine + 所有Agent** | **多轮循环** |
| 5.1 | 深度研究 | 各Agent基于论坛主持人引导进行专项搜索 | 各Agent + 反思机制 + 论坛引导 | 每轮循环 |
| 5.2 | 论坛协作 | ForumEngine监控Agent发言并生成主持人总结 | ForumEngine + LLM主持人 | 每轮循环 |
| 5.3 | 交流融合 | 各Agent根据讨论调整研究方向 | 各Agent + forum_reader工具 | 每轮循环 |
| N+1 | 结果整合 | Report Agent收集所有分析结果和论坛内容 | Report Agent | - |
| N+2 | 报告生成 | 动态选择模板和样式,多轮生成最终报告 | Report Agent + 模板引擎 | - |
### 项目代码结构树
... ... @@ -161,6 +163,8 @@ Weibo_PublicOpinion_AnalysisSystem/
├── logs/ # 运行日志目录
├── final_reports/ # 最终生成的HTML报告文件
├── utils/ # 通用工具函数
│ ├── forum_reader.py # Agent间论坛通信
│ └── retry_helper.py # 网络请求重试机制工具
├── app.py # Flask主应用入口
├── config.py # 全局配置文件
└── requirements.txt # Python依赖包清单
... ...
... ... @@ -26,7 +26,7 @@ TAVILY_API_KEY = "your_tavily_api_key"
KIMI_API_KEY = "your_kimi_api_key"
# Gemini API Key (via OpenAI format proxy)
# 申请地址https://api.chataiapi.com/
# 这里我用了一个中转api来接入Gemini,申请地址https://api.chataiapi.com/,你也可以使用其他
GEMINI_API_KEY = "your_gemini_api_key"
# Bocha Search API Key
... ...
"""
Forum日志读取工具
用于读取forum.log中的最新HOST发言
"""
import re
from pathlib import Path
from typing import Optional, List, Dict
import logging
logger = logging.getLogger(__name__)
def get_latest_host_speech(log_dir: str = "logs") -> Optional[str]:
"""
获取forum.log中最新的HOST发言
Args:
log_dir: 日志目录路径
Returns:
最新的HOST发言内容,如果没有则返回None
"""
try:
forum_log_path = Path(log_dir) / "forum.log"
if not forum_log_path.exists():
logger.debug("forum.log文件不存在")
return None
with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# 从后往前查找最新的HOST发言
host_speech = None
for line in reversed(lines):
# 匹配格式: [时间] [HOST] 内容
match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)', line)
if match:
_, content = match.groups()
# 处理转义的换行符,还原为实际换行
host_speech = content.replace('\\n', '\n').strip()
break
if host_speech:
logger.info(f"找到最新的HOST发言,长度: {len(host_speech)}字符")
else:
logger.debug("未找到HOST发言")
return host_speech
except Exception as e:
logger.error(f"读取forum.log失败: {str(e)}")
return None
def get_all_host_speeches(log_dir: str = "logs") -> List[Dict[str, str]]:
"""
获取forum.log中所有的HOST发言
Args:
log_dir: 日志目录路径
Returns:
包含所有HOST发言的列表,每个元素是包含timestamp和content的字典
"""
try:
forum_log_path = Path(log_dir) / "forum.log"
if not forum_log_path.exists():
logger.debug("forum.log文件不存在")
return []
with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
host_speeches = []
for line in lines:
# 匹配格式: [时间] [HOST] 内容
match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[HOST\]\s*(.+)', line)
if match:
timestamp, content = match.groups()
# 处理转义的换行符
content = content.replace('\\n', '\n').strip()
host_speeches.append({
'timestamp': timestamp,
'content': content
})
logger.info(f"找到{len(host_speeches)}条HOST发言")
return host_speeches
except Exception as e:
logger.error(f"读取forum.log失败: {str(e)}")
return []
def get_recent_agent_speeches(log_dir: str = "logs", limit: int = 5) -> List[Dict[str, str]]:
"""
获取forum.log中最近的Agent发言(不包括HOST)
Args:
log_dir: 日志目录路径
limit: 返回的最大发言数量
Returns:
包含最近Agent发言的列表
"""
try:
forum_log_path = Path(log_dir) / "forum.log"
if not forum_log_path.exists():
return []
with open(forum_log_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
agent_speeches = []
for line in reversed(lines): # 从后往前读取
# 匹配格式: [时间] [AGENT_NAME] 内容
match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[(INSIGHT|MEDIA|QUERY)\]\s*(.+)', line)
if match:
timestamp, agent, content = match.groups()
# 处理转义的换行符
content = content.replace('\\n', '\n').strip()
agent_speeches.append({
'timestamp': timestamp,
'agent': agent,
'content': content
})
if len(agent_speeches) >= limit:
break
agent_speeches.reverse() # 恢复时间顺序
return agent_speeches
except Exception as e:
logger.error(f"读取forum.log失败: {str(e)}")
return []
def format_host_speech_for_prompt(host_speech: str) -> str:
"""
格式化HOST发言,用于添加到prompt中
Args:
host_speech: HOST发言内容
Returns:
格式化后的内容
"""
if not host_speech:
return ""
return f"""
### 论坛主持人最新总结
以下是论坛主持人对各Agent讨论的最新总结和引导,请参考其中的观点和建议:
{host_speech}
---
"""
... ...