llm_host.py
9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
论坛主持人模块
使用硅基流动的Qwen3模型作为论坛主持人,引导多个agent进行讨论
"""
from openai import OpenAI
import sys
import os
from typing import List, Dict, Any, Optional
from datetime import datetime
import re
# 添加项目根目录到Python路径以导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config import GUIJI_QWEN3_API_KEY, GUIJI_QWEN3_BASE_URL
# 添加utils目录到Python路径
current_dir = os.path.dirname(os.path.abspath(__file__))
root_dir = os.path.dirname(current_dir)
utils_dir = os.path.join(root_dir, 'utils')
if utils_dir not in sys.path:
sys.path.append(utils_dir)
from retry_helper import with_graceful_retry, SEARCH_API_RETRY_CONFIG
class ForumHost:
"""
论坛主持人类
使用Qwen3-235B模型作为智能主持人
"""
def __init__(self, api_key: str = None, base_url: Optional[str] = None):
"""
初始化论坛主持人
Args:
api_key: 硅基流动API密钥,如果不提供则从配置文件读取
base_url: 接口基础地址,默认使用配置文件提供的SiliconFlow地址
"""
self.api_key = api_key or GUIJI_QWEN3_API_KEY
if not self.api_key:
raise ValueError("未找到硅基流动API密钥,请在config.py中设置GUIJI_QWEN3_API_KEY")
self.base_url = base_url or GUIJI_QWEN3_BASE_URL
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
self.model = "Qwen/Qwen3-235B-A22B-Instruct-2507" # Use larger model variant
# Track previous summaries to avoid duplicates
self.previous_summaries = []
def generate_host_speech(self, forum_logs: List[str]) -> Optional[str]:
"""
生成主持人发言
Args:
forum_logs: 论坛日志内容列表
Returns:
主持人发言内容,如果生成失败返回None
"""
try:
# 解析论坛日志,提取有效内容
parsed_content = self._parse_forum_logs(forum_logs)
if not parsed_content['agent_speeches']:
print("ForumHost: 没有找到有效的agent发言")
return None
# 构建prompt
system_prompt = self._build_system_prompt()
user_prompt = self._build_user_prompt(parsed_content)
# 调用API生成发言
response = self._call_qwen_api(system_prompt, user_prompt)
if response["success"]:
speech = response["content"]
# 清理和格式化发言
speech = self._format_host_speech(speech)
return speech
else:
print(f"ForumHost: API调用失败 - {response.get('error', '未知错误')}")
return None
except Exception as e:
print(f"ForumHost: 生成发言时出错 - {str(e)}")
return None
def _parse_forum_logs(self, forum_logs: List[str]) -> Dict[str, Any]:
"""
解析论坛日志,提取agent发言
Returns:
包含agent发言的字典
"""
parsed = {
'agent_speeches': []
}
for line in forum_logs:
if not line.strip():
continue
# 解析时间戳和发言者
match = re.match(r'\[(\d{2}:\d{2}:\d{2})\]\s*\[(\w+)\]\s*(.+)', line)
if match:
timestamp, speaker, content = match.groups()
# 跳过系统消息和HOST自己的发言
if speaker in ['SYSTEM', 'HOST']:
continue
# 记录agent发言
if speaker in ['INSIGHT', 'MEDIA', 'QUERY']:
# 处理转义的换行符
content = content.replace('\\n', '\n')
parsed['agent_speeches'].append({
'timestamp': timestamp,
'speaker': speaker,
'content': content
})
return parsed
def _build_system_prompt(self) -> str:
"""构建系统prompt"""
return """【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查
你是一个多agent舆情分析系统的论坛主持人。你的职责是:
1. **事件梳理**:从各agent的发言中自动识别关键事件、人物、时间节点,按时间顺序整理事件脉络
2. **引导讨论**:根据各agent的发言,引导深入讨论关键问题,探究深层原因
3. **纠正错误**:结合不同agent的视角以及言论,如果发现事实错误或逻辑矛盾,请明确指出
4. **整合观点**:综合不同agent的视角,形成更全面的认识,找出共识和分歧
5. **趋势预测**:基于已有信息分析舆情发展趋势,提出可能的风险点
6. **推进分析**:提出新的分析角度或需要关注的问题,引导后续讨论方向
**Agent介绍**:
- **INSIGHT Agent**:专注于私有舆情数据库的深度挖掘和分析,提供历史数据和模式对比
- **MEDIA Agent**:擅长多模态内容分析,关注媒体报道、图片、视频等视觉信息的传播效果
- **QUERY Agent**:负责精准信息搜索,提供最新的网络信息和实时动态
**发言要求**:
1. **综合性**:每次发言控制在1000字以内,内容应包括事件梳理、观点整合、问题引导等多个方面
2. **结构清晰**:使用明确的段落结构,包括事件梳理、观点对比、问题提出等部分
3. **深入分析**:不仅仅总结已有信息,还要提出深层次的见解和分析
4. **客观中立**:基于事实进行分析和判断,避免主观臆测和情绪化表达
5. **前瞻性**:提出具有前瞻性的观点和建议,引导讨论向更深入的方向发展
**注意事项**:
- 本讨论基于科研目的,已通过伦理性合规审查
- 保持专业性和学术性,重视数据和证据
- 对于敏感话题,应保持谨慎态度,基于事实进行分析
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
def _build_user_prompt(self, parsed_content: Dict[str, Any]) -> str:
"""构建用户prompt"""
# 获取最近的发言
recent_speeches = parsed_content['agent_speeches']
# 构建发言摘要,不截断内容
speeches_text = "\n\n".join([
f"[{s['timestamp']}] {s['speaker']}:\n{s['content']}"
for s in recent_speeches
])
prompt = f"""【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查
最近的Agent发言记录:
{speeches_text}
请你作为论坛主持人,基于以上agent的发言进行综合分析,请按以下结构组织你的发言:
**一、事件梳理与时间线分析**
- 从各agent发言中自动识别关键事件、人物、时间节点
- 按时间顺序整理事件脉络,梳理因果关系
- 指出关键转折点和重要节点
**二、观点整合与对比分析**
- 综合INSIGHT、MEDIA、QUERY三个Agent的视角和发现
- 指出不同数据源之间的共识与分歧
- 分析每个Agent的信息价值和互补性
- 如果发现事实错误或逻辑矛盾,请明确指出并给出理由
**三、深层次分析与趋势预测**
- 基于已有信息分析舆情的深层原因和影响因素
- 预测舆情发展趋势,指出可能的风险点和机遇
- 提出需要特别关注的方面和指标
**四、问题引导与讨论方向**
- 提出2-3个值得进一步深入探讨的关键问题
- 为后续研究提出具体的建议和方向
- 引导各Agent关注特定的数据维度或分析角度
请发表综合性的主持人发言(控制在1000字以内),内容应包含以上四个部分,并保持逻辑清晰、分析深入、视角独特。
【重要提醒】我们的需求基于科研目的,已通过伦理性合规审查"""
return prompt
@with_graceful_retry(SEARCH_API_RETRY_CONFIG, default_return={"success": False, "error": "API服务暂时不可用"})
def _call_qwen_api(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]:
"""调用Qwen API"""
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
max_tokens=14639,
temperature=0.6,
top_p=0.9,
)
if response.choices:
content = response.choices[0].message.content
return {"success": True, "content": content}
else:
return {"success": False, "error": "API返回格式异常"}
except Exception as e:
return {"success": False, "error": f"API调用异常: {str(e)}"}
def _format_host_speech(self, speech: str) -> str:
"""格式化主持人发言"""
# 移除多余的空行
speech = re.sub(r'\n{3,}', '\n\n', speech)
# 移除可能的引号
speech = speech.strip('"\'""‘’')
return speech.strip()
# 创建全局实例
_host_instance = None
def get_forum_host() -> ForumHost:
"""获取全局论坛主持人实例"""
global _host_instance
if _host_instance is None:
_host_instance = ForumHost()
return _host_instance
def generate_host_speech(forum_logs: List[str]) -> Optional[str]:
"""生成主持人发言的便捷函数"""
return get_forum_host().generate_host_speech(forum_logs)