keyword_optimizer.py
11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
"""
关键词优化中间件
使用Qwen AI将Agent生成的搜索词优化为更适合舆情数据库查询的关键词
"""
import requests
import json
import sys
import os
from typing import List, Dict, Any
from dataclasses import dataclass
# 添加项目根目录到Python路径以导入config
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
from config import GUIJI_QWEN3_API_KEY
@dataclass
class KeywordOptimizationResponse:
"""关键词优化响应"""
original_query: str
optimized_keywords: List[str]
reasoning: str
success: bool
error_message: str = ""
class KeywordOptimizer:
"""
关键词优化器
使用硅基流动的Qwen3模型将Agent生成的搜索词优化为更贴近真实舆情的关键词
"""
def __init__(self, api_key: str = None):
"""
初始化关键词优化器
Args:
api_key: 硅基流动API密钥,如果不提供则从配置文件读取
"""
self.api_key = api_key or GUIJI_QWEN3_API_KEY
self.base_url = "https://api.siliconflow.cn/v1/chat/completions"
self.model = "Qwen/Qwen3-30B-A3B-Instruct-2507"
if not self.api_key:
raise ValueError("未找到硅基流动API密钥,请在config.py中设置GUIJI_QWEN3_API_KEY")
def optimize_keywords(self, original_query: str, context: str = "") -> KeywordOptimizationResponse:
"""
优化搜索关键词
Args:
original_query: Agent生成的原始搜索查询
context: 额外的上下文信息(如段落标题、内容描述等)
Returns:
KeywordOptimizationResponse: 优化后的关键词列表
"""
print(f"🔍 关键词优化中间件: 处理查询 '{original_query}'")
try:
# 构建优化prompt
system_prompt = self._build_system_prompt()
user_prompt = self._build_user_prompt(original_query, context)
# 调用Qwen API
response = self._call_qwen_api(system_prompt, user_prompt)
if response["success"]:
# 解析响应
content = response["content"]
try:
# 尝试解析JSON格式的响应
if content.strip().startswith('{'):
parsed = json.loads(content)
keywords = parsed.get("keywords", [])
reasoning = parsed.get("reasoning", "")
else:
# 如果不是JSON格式,尝试从文本中提取关键词
keywords = self._extract_keywords_from_text(content)
reasoning = content
# 验证关键词质量
validated_keywords = self._validate_keywords(keywords)
print(f"✅ 优化成功: {len(validated_keywords)}个关键词")
for i, keyword in enumerate(validated_keywords, 1):
print(f" {i}. '{keyword}'")
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=validated_keywords,
reasoning=reasoning,
success=True
)
except Exception as e:
print(f"⚠️ 解析响应失败,使用备用方案: {str(e)}")
# 备用方案:从原始查询中提取关键词
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="API响应解析失败,使用备用关键词提取",
success=True
)
else:
print(f"❌ API调用失败: {response['error']}")
# 使用备用方案
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="API调用失败,使用备用关键词提取",
success=True,
error_message=response['error']
)
except Exception as e:
print(f"❌ 关键词优化失败: {str(e)}")
# 最终备用方案
fallback_keywords = self._fallback_keyword_extraction(original_query)
return KeywordOptimizationResponse(
original_query=original_query,
optimized_keywords=fallback_keywords,
reasoning="系统错误,使用备用关键词提取",
success=False,
error_message=str(e)
)
def _build_system_prompt(self) -> str:
"""构建系统prompt"""
return """你是一位专业的舆情数据挖掘专家。你的任务是将用户提供的搜索查询优化为更适合在社交媒体舆情数据库中查找的关键词。
**核心原则**:
1. **贴近网民语言**:使用普通网友在社交媒体上会使用的词汇
2. **避免专业术语**:不使用"舆情"、"传播"、"倾向"、"展望"等官方词汇
3. **简洁具体**:每个关键词要非常简洁明了,便于数据库匹配
4. **情感丰富**:包含网民常用的情感表达词汇
5. **数量控制**:最少提供10个关键词,最多提供20个关键词
6. **避免重复**:不要脱离初始查询的主题
**输出格式**:
请以JSON格式返回结果:
{
"keywords": ["关键词1", "关键词2", "关键词3"],
"reasoning": "选择这些关键词的理由"
}
**示例**:
输入:"武汉大学舆情管理 未来展望 发展趋势"
输出:
{
"keywords": ["武大", "武汉大学", "学校管理", "大学", "教育"],
"reasoning": "选择'武大'和'武汉大学'作为核心词汇,这是网民最常使用的称呼;'学校管理'比'舆情管理'更贴近日常表达;避免使用'未来展望'、'发展趋势'等网民很少使用的专业术语"
}"""
def _build_user_prompt(self, original_query: str, context: str) -> str:
"""构建用户prompt"""
prompt = f"请将以下搜索查询优化为适合舆情数据库查询的关键词:\n\n原始查询:{original_query}"
if context:
prompt += f"\n\n上下文信息:{context}"
prompt += "\n\n请记住:要使用网民在社交媒体上真实使用的词汇,避免官方术语和专业词汇。"
return prompt
def _call_qwen_api(self, system_prompt: str, user_prompt: str) -> Dict[str, Any]:
"""调用Qwen API"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
data = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
"max_tokens": 10000,
"temperature": 0.7
}
try:
response = requests.post(self.base_url, headers=headers, json=data, timeout=30)
response.raise_for_status()
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
return {"success": True, "content": content}
else:
return {"success": False, "error": "API返回格式异常"}
except requests.exceptions.RequestException as e:
return {"success": False, "error": f"网络请求错误: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"API调用异常: {str(e)}"}
def _extract_keywords_from_text(self, text: str) -> List[str]:
"""从文本中提取关键词(当JSON解析失败时使用)"""
# 简单的关键词提取逻辑
lines = text.split('\n')
keywords = []
for line in lines:
line = line.strip()
# 查找可能的关键词
if ':' in line or ':' in line:
parts = line.split(':') if ':' in line else line.split(':')
if len(parts) > 1:
potential_keywords = parts[1].strip()
# 尝试分割关键词
if '、' in potential_keywords:
keywords.extend([k.strip() for k in potential_keywords.split('、')])
elif ',' in potential_keywords:
keywords.extend([k.strip() for k in potential_keywords.split(',')])
else:
keywords.append(potential_keywords)
# 如果没有找到,尝试其他方法
if not keywords:
# 查找引号中的内容
import re
quoted_content = re.findall(r'["""\'](.*?)["""\']', text)
keywords.extend(quoted_content)
# 清理和验证关键词
cleaned_keywords = []
for keyword in keywords[:20]: # 最多20个
keyword = keyword.strip().strip('"\'""''')
if keyword and len(keyword) <= 20: # 合理长度
cleaned_keywords.append(keyword)
return cleaned_keywords[:20]
def _validate_keywords(self, keywords: List[str]) -> List[str]:
"""验证和清理关键词"""
validated = []
# 不良关键词(过于专业或官方)
bad_keywords = {
'态度分析', '公众反应', '情绪倾向',
'未来展望', '发展趋势', '战略规划', '政策导向', '管理机制'
}
for keyword in keywords:
if isinstance(keyword, str):
keyword = keyword.strip().strip('"\'""''')
# 基本验证
if (keyword and
len(keyword) <= 20 and
len(keyword) >= 1 and
not any(bad_word in keyword for bad_word in bad_keywords)):
validated.append(keyword)
return validated[:20] # 最多返回20个关键词
def _fallback_keyword_extraction(self, original_query: str) -> List[str]:
"""备用关键词提取方案"""
# 简单的关键词提取逻辑
# 移除常见的无用词汇
stop_words = {'、'}
# 分割查询
import re
# 按空格、标点分割
tokens = re.split(r'[\s,。!?;:、]+', original_query)
keywords = []
for token in tokens:
token = token.strip()
if token and token not in stop_words and len(token) >= 2:
keywords.append(token)
# 如果没有有效关键词,使用原始查询的第一个词
if not keywords:
first_word = original_query.split()[0] if original_query.split() else original_query
keywords = [first_word] if first_word else ["热门"]
return keywords[:20]
# 全局实例
keyword_optimizer = KeywordOptimizer()