keyword_manager.py
11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DeepSentimentCrawling模块 - 关键词管理器
从BroadTopicExtraction模块获取关键词并分配给不同平台进行爬取
"""
import sys
import json
from datetime import date, timedelta, datetime
from pathlib import Path
from typing import List, Dict, Optional
import random
import pymysql
from pymysql.cursors import DictCursor
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
try:
import config
except ImportError:
raise ImportError("无法导入config.py配置文件")
class KeywordManager:
"""关键词管理器"""
def __init__(self):
"""初始化关键词管理器"""
self.connection = None
self.connect()
def connect(self):
"""连接数据库"""
try:
self.connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
autocommit=True,
cursorclass=DictCursor
)
print(f"关键词管理器成功连接到数据库: {config.DB_NAME}")
except Exception as e:
print(f"关键词管理器数据库连接失败: {e}")
raise
def get_latest_keywords(self, target_date: date = None, max_keywords: int = 100) -> List[str]:
"""
获取最新的关键词列表
Args:
target_date: 目标日期,默认为今天
max_keywords: 最大关键词数量
Returns:
关键词列表
"""
if not target_date:
target_date = date.today()
print(f"正在获取 {target_date} 的关键词...")
# 首先尝试获取指定日期的关键词
topics_data = self.get_daily_topics(target_date)
if topics_data and topics_data.get('keywords'):
keywords = topics_data['keywords']
print(f"成功获取 {target_date} 的 {len(keywords)} 个关键词")
# 如果关键词太多,随机选择指定数量
if len(keywords) > max_keywords:
keywords = random.sample(keywords, max_keywords)
print(f"随机选择了 {max_keywords} 个关键词")
return keywords
# 如果没有当天的关键词,尝试获取最近几天的
print(f"{target_date} 没有关键词数据,尝试获取最近的关键词...")
recent_topics = self.get_recent_topics(days=7)
if recent_topics:
# 合并最近几天的关键词
all_keywords = []
for topic in recent_topics:
if topic.get('keywords'):
all_keywords.extend(topic['keywords'])
# 去重并限制数量
unique_keywords = list(set(all_keywords))
if len(unique_keywords) > max_keywords:
unique_keywords = random.sample(unique_keywords, max_keywords)
print(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词")
return unique_keywords
# 如果都没有,返回默认关键词
print("没有找到任何关键词数据,使用默认关键词")
return self._get_default_keywords()
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
"""
获取每日话题分析
Args:
extract_date: 提取日期,默认为今天
Returns:
话题分析数据,如果不存在返回None
"""
if not extract_date:
extract_date = date.today()
try:
cursor = self.connection.cursor()
query = "SELECT * FROM daily_topics WHERE extract_date = %s"
cursor.execute(query, (extract_date,))
result = cursor.fetchone()
if result:
# 解析关键词JSON
result['keywords'] = json.loads(result['keywords'])
return result
else:
return None
except Exception as e:
print(f"获取话题分析失败: {e}")
return None
def get_recent_topics(self, days: int = 7) -> List[Dict]:
"""
获取最近几天的话题分析
Args:
days: 天数
Returns:
话题分析列表
"""
try:
cursor = self.connection.cursor()
query = """
SELECT * FROM daily_topics
WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY extract_date DESC
"""
cursor.execute(query, (days,))
results = cursor.fetchall()
# 解析每个结果的关键词JSON
for result in results:
result['keywords'] = json.loads(result['keywords'])
return results
except Exception as e:
print(f"获取最近话题分析失败: {e}")
return []
def _get_default_keywords(self) -> List[str]:
"""获取默认关键词列表"""
return [
"科技", "人工智能", "AI", "编程", "互联网",
"创业", "投资", "理财", "股市", "经济",
"教育", "学习", "考试", "大学", "就业",
"健康", "养生", "运动", "美食", "旅游",
"时尚", "美妆", "购物", "生活", "家居",
"电影", "音乐", "游戏", "娱乐", "明星",
"新闻", "热点", "社会", "政策", "环保"
]
def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None,
max_keywords: int = 100) -> List[str]:
"""
为所有平台获取相同的关键词列表
Args:
platforms: 平台列表
target_date: 目标日期
max_keywords: 最大关键词数量
Returns:
关键词列表(所有平台共用)
"""
keywords = self.get_latest_keywords(target_date, max_keywords)
if keywords:
print(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词")
print(f"每个关键词将在所有平台上进行爬取")
return keywords
def get_keywords_for_platform(self, platform: str, target_date: date = None,
max_keywords: int = 50) -> List[str]:
"""
为特定平台获取关键词(现在所有平台使用相同关键词)
Args:
platform: 平台名称
target_date: 目标日期
max_keywords: 最大关键词数量
Returns:
关键词列表(与其他平台相同)
"""
keywords = self.get_latest_keywords(target_date, max_keywords)
print(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)")
return keywords
def _filter_keywords_by_platform(self, keywords: List[str], platform: str) -> List[str]:
"""
根据平台特性过滤关键词
Args:
keywords: 原始关键词列表
platform: 平台名称
Returns:
过滤后的关键词列表
"""
# 平台特性关键词映射(可以根据需要调整)
platform_preferences = {
'xhs': ['美妆', '时尚', '生活', '美食', '旅游', '购物', '健康', '养生'],
'dy': ['娱乐', '音乐', '舞蹈', '搞笑', '美食', '生活', '科技', '教育'],
'ks': ['生活', '搞笑', '农村', '美食', '手工', '音乐', '娱乐'],
'bili': ['科技', '游戏', '动漫', '学习', '编程', '数码', '科普'],
'wb': ['热点', '新闻', '娱乐', '明星', '社会', '时事', '科技'],
'tieba': ['游戏', '动漫', '学习', '生活', '兴趣', '讨论'],
'zhihu': ['知识', '学习', '科技', '职场', '投资', '教育', '思考']
}
# 如果平台有特定偏好,优先选择相关关键词
preferred_keywords = platform_preferences.get(platform, [])
if preferred_keywords:
# 先选择平台偏好的关键词
filtered = []
remaining = []
for keyword in keywords:
if any(pref in keyword for pref in preferred_keywords):
filtered.append(keyword)
else:
remaining.append(keyword)
# 如果偏好关键词不够,补充其他关键词
if len(filtered) < len(keywords) // 2:
filtered.extend(remaining[:len(keywords) - len(filtered)])
return filtered
# 如果没有特定偏好,返回原关键词
return keywords
def get_crawling_summary(self, target_date: date = None) -> Dict:
"""
获取爬取任务摘要
Args:
target_date: 目标日期
Returns:
爬取摘要信息
"""
if not target_date:
target_date = date.today()
topics_data = self.get_daily_topics(target_date)
if topics_data:
return {
'date': target_date,
'keywords_count': len(topics_data.get('keywords', [])),
'summary': topics_data.get('summary', ''),
'has_data': True
}
else:
return {
'date': target_date,
'keywords_count': 0,
'summary': '暂无数据',
'has_data': False
}
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
print("关键词管理器数据库连接已关闭")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == "__main__":
# 测试关键词管理器
with KeywordManager() as km:
# 测试获取关键词
keywords = km.get_latest_keywords(max_keywords=20)
print(f"获取到的关键词: {keywords}")
# 测试平台分配
platforms = ['xhs', 'dy', 'bili']
distribution = km.distribute_keywords_by_platform(keywords, platforms)
for platform, kws in distribution.items():
print(f"{platform}: {kws}")
# 测试爬取摘要
summary = km.get_crawling_summary()
print(f"爬取摘要: {summary}")
print("关键词管理器测试完成!")