search.py
29.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
"""
专为 AI Agent 设计的本地舆情数据库查询工具集 (MediaCrawlerDB)
版本: 3.0
最后更新: 2025-08-23
此脚本将复杂的本地MySQL数据库查询功能封装成一系列目标明确、参数清晰的独立工具,
专为AI Agent调用而设计。Agent只需根据任务意图(如搜索热点、全局搜索话题、
按时间范围分析、获取评论)选择合适的工具,无需编写复杂的SQL语句。
V3.0 核心更新:
- 智能热度计算: `search_hot_content`不再需要`sort_by`参数,改为内部使用统一的加权热度算法,
综合点赞、评论、分享、观看等数据计算热度分值,使结果更智能、更符合综合热度。
- 新增平台精搜工具: 新增 `search_topic_on_platform` 工具,作为特例,
允许Agent在特定平台(B站、微博等七大平台)上对某一话题进行精确搜索,并支持时间筛选。
- 结构优化: 调整了数据结构与函数文档,以适应新功能。
主要工具:
- search_hot_content: 查找指定时间范围内的综合热度最高的内容。
- search_topic_globally: 在整个数据库中全局搜索与特定话题相关的所有内容和评论。
- search_topic_by_date: 在指定的历史日期范围内搜索与特定话题相关的内容。
- get_comments_for_topic: 专门提取公众对于某一特定话题的评论数据。
- search_topic_on_platform: 在指定的单个社交媒体平台上搜索特定话题。
"""
import os
import json
import pymysql
import pymysql.cursors
from typing import List, Dict, Any, Optional, Literal
from dataclasses import dataclass, field
from datetime import datetime, timedelta, date
# --- 1. 数据结构定义 ---
@dataclass
class QueryResult:
"""统一的数据库查询结果数据类"""
platform: str
content_type: str
title_or_content: str
author_nickname: Optional[str] = None
url: Optional[str] = None
publish_time: Optional[datetime] = None
engagement: Dict[str, int] = field(default_factory=dict)
source_keyword: Optional[str] = None
hotness_score: float = 0.0
source_table: str = ""
@dataclass
class DBResponse:
"""封装工具的完整返回结果"""
tool_name: str
parameters: Dict[str, Any]
results: List[QueryResult] = field(default_factory=list)
results_count: int = 0
error_message: Optional[str] = None
# --- 2. 核心客户端与专用工具集 ---
class MediaCrawlerDB:
"""包含多种专用舆情数据库查询工具的客户端"""
# 权重定义
W_LIKE = 1.0
W_COMMENT = 5.0
W_SHARE = 10.0 # 分享/转发/收藏/投币等高价值互动
W_VIEW = 0.1
W_DANMAKU = 0.5
def __init__(self):
"""
初始化客户端。连接信息从环境变量自动读取:
- DB_HOST, DB_USER, DB_PASSWORD, DB_NAME
- DB_PORT (可选, 默认 3306)
- DB_CHARSET (可选, 默认 utf8mb4)
"""
self.db_config = {
'host': os.getenv("DB_HOST"),
'user': os.getenv("DB_USER"),
'password': os.getenv("DB_PASSWORD"),
'db': os.getenv("DB_NAME"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4"),
'cursorclass': pymysql.cursors.DictCursor
}
required = ['host', 'user', 'password', 'db']
if missing := [k for k in required if not self.db_config[k]]:
raise ValueError(f"数据库配置缺失! 请设置环境变量或在代码中提供: {', '.join([f'DB_{k.upper()}' for k in missing])}")
def _execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
conn = None
try:
conn = pymysql.connect(**self.db_config)
with conn.cursor() as cursor:
cursor.execute(query, params or ())
return cursor.fetchall()
except pymysql.Error as e:
print(f"数据库查询时发生错误: {e}")
return []
finally:
if conn: conn.close()
@staticmethod
def _to_datetime(ts: Any) -> Optional[datetime]:
if not ts: return None
try:
if isinstance(ts, datetime): return ts
if isinstance(ts, date): return datetime.combine(ts, datetime.min.time())
if isinstance(ts, (int, float)) or str(ts).isdigit():
val = float(ts)
return datetime.fromtimestamp(val / 1000 if val > 1_000_000_000_000 else val)
if isinstance(ts, str):
return datetime.fromisoformat(ts.split('+')[0].strip())
except (ValueError, TypeError): return None
_table_columns_cache = {}
def _get_table_columns(self, table_name: str) -> List[str]:
if table_name in self._table_columns_cache: return self._table_columns_cache[table_name]
results = self._execute_query(f"SHOW COLUMNS FROM `{table_name}`")
columns = [row['Field'] for row in results] if results else []
self._table_columns_cache[table_name] = columns
return columns
def _extract_engagement(self, row: Dict[str, Any]) -> Dict[str, int]:
"""从数据行中提取并统一互动指标"""
engagement = {}
mapping = { 'likes': ['liked_count', 'like_count', 'voteup_count', 'comment_like_count'], 'comments': ['video_comment', 'comments_count', 'comment_count', 'total_replay_num', 'sub_comment_count'], 'shares': ['video_share_count', 'shared_count', 'share_count', 'total_forwards'], 'views': ['video_play_count', 'viewd_count'], 'favorites': ['video_favorite_count', 'collected_count'], 'coins': ['video_coin_count'], 'danmaku': ['video_danmaku'], }
for key, potential_cols in mapping.items():
for col in potential_cols:
if col in row and row[col] is not None:
try: engagement[key] = int(row[col])
except (ValueError, TypeError): engagement[key] = 0
break
return engagement
def search_hot_content(
self,
time_period: Literal['24h', 'week', 'year'] = 'week',
limit: int = 50
) -> DBResponse:
"""
【工具】查找热点内容: 获取最近一段时间内综合热度最高的内容。
Args:
time_period (Literal['24h', 'week', 'year']): 时间范围,默认为 'week'。
limit (int): 返回结果的最大数量,默认为 50。
Returns:
DBResponse: 包含按综合热度排序后的内容列表。
"""
params_for_log = {'time_period': time_period, 'limit': limit}
print(f"--- TOOL: 查找热点内容 (params: {params_for_log}) ---")
now = datetime.now()
start_time = now - timedelta(days={'24h': 1, 'week': 7}.get(time_period, 365))
# 定义各平台的热度计算SQL片段
hotness_formulas = {
'bilibili_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(video_comment AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(video_share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_favorite_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_coin_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_danmaku AS UNSIGNED), 0) * {self.W_DANMAKU} + COALESCE(CAST(video_play_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})",
'douyin_aweme': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})",
'weibo_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comments_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(shared_count AS UNSIGNED), 0) * {self.W_SHARE})",
'xhs_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})",
'kuaishou_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(viewd_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})",
'zhihu_content': f"(COALESCE(CAST(voteup_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT})",
}
all_queries, params = [], []
for table, formula in hotness_formulas.items():
time_filter_sql, time_filter_param = "", None
if table == 'weibo_note': time_filter_sql, time_filter_param = "`create_date_time` >= %s", start_time.strftime('%Y-%m-%d %H:%M:%S')
elif table in ['kuaishou_video', 'xhs_note', 'douyin_aweme']: time_col = 'time' if table == 'xhs_note' else 'create_time'; time_filter_sql, time_filter_param = f"`{time_col}` >= %s", str(int(start_time.timestamp() * 1000))
elif table == 'zhihu_content': time_filter_sql, time_filter_param = "CAST(`created_time` AS UNSIGNED) >= %s", str(int(start_time.timestamp()))
else: time_filter_sql, time_filter_param = "`create_time` >= %s", str(int(start_time.timestamp()))
content_type = 'note' if table in ['weibo_note', 'xhs_note'] else 'content' if table == 'zhihu_content' else 'video'
query_template = "SELECT '{platform}' as p, '{type}' as t, {title} as title, {author} as author, {url} as url, {ts} as ts, {formula} as hotness_score, source_keyword, '{tbl}' as tbl FROM `{tbl}` WHERE {time_filter}"
field_subs = {'platform': table.split('_')[0], 'type': content_type, 'title': 'title', 'author': 'nickname', 'url': 'video_url', 'ts': 'create_time', 'formula': formula, 'tbl': table, 'time_filter': time_filter_sql}
if table == 'weibo_note': field_subs.update({'title': 'content', 'url': 'note_url', 'ts': 'create_date_time'})
elif table == 'xhs_note': field_subs.update({'ts': 'time', 'url': 'note_url'})
elif table == 'zhihu_content': field_subs.update({'author': 'user_nickname', 'url': 'content_url', 'ts': 'created_time'})
elif table == 'douyin_aweme': field_subs.update({'url': 'aweme_url'})
all_queries.append(query_template.format(**field_subs))
params.append(time_filter_param)
final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY hotness_score DESC LIMIT %s"
raw_results = self._execute_query(final_query, tuple(params) + (limit,))
formatted_results = [QueryResult(platform=r['p'], content_type=r['t'], title_or_content=r['title'], author_nickname=r.get('author'), url=r['url'], publish_time=self._to_datetime(r['ts']), engagement=self._extract_engagement(r), hotness_score=r.get('hotness_score', 0.0), source_keyword=r.get('source_keyword'), source_table=r['tbl']) for r in raw_results]
return DBResponse("search_hot_content", params_for_log, results=formatted_results, results_count=len(formatted_results))
def search_topic_globally(self, topic: str, limit_per_table: int = 100) -> DBResponse:
"""
【工具】全局话题搜索: 在数据库中(内容、评论、标签、来源关键字)全面搜索指定话题。
Args:
topic (str): 要搜索的话题关键词。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 100。
Returns:
DBResponse: 包含所有匹配结果的聚合列表。
"""
params_for_log = {'topic': topic, 'limit_per_table': limit_per_table}
print(f"--- TOOL: 全局话题搜索 (params: {params_for_log}) ---")
search_term, all_results = f"%{topic}%", []
search_configs = { 'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'bilibili_video_comment': {'fields': ['content'], 'type': 'comment'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'douyin_aweme_comment': {'fields': ['content'], 'type': 'comment'}, 'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'kuaishou_video_comment': {'fields': ['content'], 'type': 'comment'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note'}, 'weibo_note_comment': {'fields': ['content'], 'type': 'comment'}, 'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note'}, 'xhs_note_comment': {'fields': ['content'], 'type': 'comment'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content'}, 'zhihu_comment': {'fields': ['content'], 'type': 'comment'}, 'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note'}, 'tieba_comment': {'fields': ['content'], 'type': 'comment'}, 'daily_news': {'fields': ['title'], 'type': 'news'}, }
for table, config in search_configs.items():
where_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
query = f"SELECT * FROM `{table}` WHERE {where_clause} ORDER BY id DESC LIMIT %s"
params = (search_term,) * len(config['fields']) + (limit_per_table,)
raw_results = self._execute_query(query, params)
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
time_key = row.get('create_time') or row.get('time') or row.get('created_time') or row.get('publish_time') or row.get('crawl_date')
all_results.append(QueryResult(
platform=table.split('_')[0], content_type=config['type'],
title_or_content=content[:500] if content else '',
author_nickname=row.get('nickname') or row.get('user_nickname') or row.get('user_name'),
url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'),
publish_time=self._to_datetime(time_key),
engagement=self._extract_engagement(row),
source_keyword=row.get('source_keyword'),
source_table=table
))
return DBResponse("search_topic_globally", params_for_log, results=all_results, results_count=len(all_results))
def search_topic_by_date(self, topic: str, start_date: str, end_date: str, limit_per_table: int = 100) -> DBResponse:
"""
【工具】按日期搜索话题: 在明确的历史时间段内,搜索与特定话题相关的内容。
Args:
topic (str): 要搜索的话题关键词。
start_date (str): 开始日期,格式 'YYYY-MM-DD'。
end_date (str): 结束日期,格式 'YYYY-MM-DD'。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 100。
Returns:
DBResponse: 包含在指定日期范围内找到的结果的聚合列表。
"""
params_for_log = {'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit_per_table': limit_per_table}
print(f"--- TOOL: 按日期搜索话题 (params: {params_for_log}) ---")
try:
start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1)
except ValueError:
return DBResponse("search_topic_by_date", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。")
search_term, all_results = f"%{topic}%", []
search_configs = {
'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'},
'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'},
'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'},
'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, 'daily_news': {'fields': ['title'], 'type': 'news', 'time_col': 'crawl_date', 'time_type': 'date_str'},
}
for table, config in search_configs.items():
topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
time_col, time_type = config['time_col'], config['time_type']
if time_type == 'sec': time_params = (int(start_dt.timestamp()), int(end_dt.timestamp()))
elif time_type == 'ms': time_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000))
elif time_type in ['str', 'date_str']: time_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d'))
else: time_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp())))
time_clause = f"`{time_col}` >= %s AND `{time_col}` < %s"
if table == 'zhihu_content': time_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s"
query = f"SELECT * FROM `{table}` WHERE ({topic_clause}) AND ({time_clause}) ORDER BY id DESC LIMIT %s"
params = (search_term,) * len(config['fields']) + time_params + (limit_per_table,)
raw_results = self._execute_query(query, params)
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
all_results.append(QueryResult(
platform=table.split('_')[0], content_type=config['type'],
title_or_content=content[:500] if content else '',
author_nickname=row.get('nickname') or row.get('user_nickname'),
url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'),
publish_time=self._to_datetime(row.get(config['time_col'])),
engagement=self._extract_engagement(row),
source_keyword=row.get('source_keyword'),
source_table=table
))
return DBResponse("search_topic_by_date", params_for_log, results=all_results, results_count=len(all_results))
def get_comments_for_topic(self, topic: str, limit: int = 500) -> DBResponse:
"""
【工具】获取话题评论: 专门搜索并返回所有平台中与特定话题相关的公众评论数据。
Args:
topic (str): 要搜索的话题关键词。
limit (int): 返回评论的总数量上限,默认为 500。
Returns:
DBResponse: 包含匹配的评论列表。
"""
params_for_log = {'topic': topic, 'limit': limit}
print(f"--- TOOL: 获取话题评论 (params: {params_for_log}) ---")
search_term = f"%{topic}%"
comment_tables = ['bilibili_video_comment', 'douyin_aweme_comment', 'kuaishou_video_comment', 'weibo_note_comment', 'xhs_note_comment', 'zhihu_comment', 'tieba_comment']
all_queries = []
for table in comment_tables:
cols = self._get_table_columns(table)
author_col = 'user_nickname' if 'user_nickname' in cols else 'nickname'
like_col = 'comment_like_count' if 'comment_like_count' in cols else 'like_count' if 'like_count' in cols else None
time_col = 'publish_time' if 'publish_time' in cols else 'create_date_time' if 'create_date_time' in cols else 'create_time'
like_select = f"`{like_col}` as likes" if like_col else "'0' as likes"
query = (f"SELECT '{table.split('_')[0]}' as platform, `content`, `{author_col}` as author, "
f"`{time_col}` as ts, {like_select}, '{table}' as source_table "
f"FROM `{table}` WHERE `content` LIKE %s")
all_queries.append(query)
final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY ts DESC LIMIT %s"
params = (search_term,) * len(comment_tables) + (limit,)
raw_results = self._execute_query(final_query, params)
formatted = [QueryResult(platform=r['platform'], content_type='comment', title_or_content=r['content'], author_nickname=r['author'], publish_time=self._to_datetime(r['ts']), engagement={'likes': int(r['likes']) if str(r['likes']).isdigit() else 0}, source_table=r['source_table']) for r in raw_results]
return DBResponse("get_comments_for_topic", params_for_log, results=formatted, results_count=len(formatted))
def search_topic_on_platform(
self,
platform: Literal['bilibili', 'weibo', 'douyin', 'kuaishou', 'xhs', 'zhihu', 'tieba'],
topic: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 20
) -> DBResponse:
"""
【工具】平台定向搜索: (新增) 在指定的单个社交媒体平台上搜索特定话题。
Args:
platform (Literal['bilibili', ...]): 要搜索的平台,必须是七个支持的平台之一。
topic (str): 要搜索的话题关键词。
start_date (Optional[str]): 开始日期,格式 'YYYY-MM-DD'。默认为None。
end_date (Optional[str]): 结束日期,格式 'YYYY-MM-DD'。默认为None。
limit (int): 返回结果的最大数量,默认为 20。
Returns:
DBResponse: 包含在该平台找到的结果列表。
"""
params_for_log = {'platform': platform, 'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit': limit}
print(f"--- TOOL: 平台定向搜索 (params: {params_for_log}) ---")
all_configs = { 'bilibili': [{'table': 'bilibili_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, {'table': 'bilibili_video_comment', 'fields': ['content'], 'type': 'comment'}], 'douyin': [{'table': 'douyin_aweme', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'douyin_aweme_comment', 'fields': ['content'], 'type': 'comment'}], 'kuaishou': [{'table': 'kuaishou_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'kuaishou_video_comment', 'fields': ['content'], 'type': 'comment'}], 'weibo': [{'table': 'weibo_note', 'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'}, {'table': 'weibo_note_comment', 'fields': ['content'], 'type': 'comment'}], 'xhs': [{'table': 'xhs_note', 'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, {'table': 'xhs_note_comment', 'fields': ['content'], 'type': 'comment'}], 'zhihu': [{'table': 'zhihu_content', 'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'}, {'table': 'zhihu_comment', 'fields': ['content'], 'type': 'comment'}], 'tieba': [{'table': 'tieba_note', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, {'table': 'tieba_comment', 'fields': ['content'], 'type': 'comment'}] }
if platform not in all_configs:
return DBResponse("search_topic_on_platform", params_for_log, error_message=f"不支持的平台: {platform}")
search_term, all_results = f"%{topic}%", []
platform_configs = all_configs[platform]
time_clause, time_params_tuple = "", ()
if start_date and end_date:
try:
start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1)
except ValueError:
return DBResponse("search_topic_on_platform", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。")
else:
start_dt, end_dt = None, None
for config in platform_configs:
table = config['table']
topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
query = f"SELECT * FROM `{table}` WHERE {topic_clause}"
params = [search_term] * len(config['fields'])
if start_dt and end_dt and 'time_col' in config:
time_col, time_type = config['time_col'], config['time_type']
if time_type == 'sec': t_params = (int(start_dt.timestamp()), int(end_dt.timestamp()))
elif time_type == 'ms': t_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000))
elif time_type in ['str', 'date_str']: t_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d'))
else: t_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp())))
t_clause = f"`{time_col}` >= %s AND `{time_col}` < %s"
if table == 'zhihu_content': t_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s"
query += f" AND ({t_clause})"
params.extend(t_params)
query += f" ORDER BY id DESC LIMIT %s"
params.append(limit)
raw_results = self._execute_query(query, tuple(params))
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
time_key = config.get('time_col') and row.get(config.get('time_col'))
all_results.append(QueryResult(platform=platform, content_type=config['type'], title_or_content=content[:500] if content else '', author_nickname=row.get('nickname') or row.get('user_nickname'), url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'), publish_time=self._to_datetime(time_key), engagement=self._extract_engagement(row), source_keyword=row.get('source_keyword'), source_table=table))
return DBResponse("search_topic_on_platform", params_for_log, results=all_results, results_count=len(all_results))
# --- 3. 测试与使用示例 ---
def print_response_summary(response: DBResponse):
"""简化的打印函数,用于展示测试结果"""
if response.error_message:
print(f"工具 '{response.tool_name}' 执行出错: {response.error_message}")
print("-" * 80)
return
params_str = ", ".join(f"{k}='{v}'" for k, v in response.parameters.items())
print(f"查询: 工具='{response.tool_name}', 参数=[{params_str}]")
print(f"找到 {response.results_count} 条相关记录。")
if response.results:
print("--- 前5条结果示例 ---")
for i, res in enumerate(response.results[:5]):
engagement_str = ", ".join(f"{k}: {v}" for k, v in res.engagement.items() if v)
content_preview = (res.title_or_content.replace('\n', ' ')[:70] + '...') if res.title_or_content and len(res.title_or_content) > 70 else res.title_or_content
hotness_str = f", hotness: {res.hotness_score:.2f}" if res.hotness_score > 0 else ""
print(
f"{i+1}. [{res.platform.upper()}/{res.content_type}] {content_preview}\n"
f" by: {res.author_nickname}, at: {res.publish_time.strftime('%Y-%m-%d %H:%M') if res.publish_time else 'N/A'}"
f", src_kw: '{res.source_keyword or 'N/A'}'{hotness_str}"
f", engagement: {{{engagement_str}}}"
)
print("-" * 80)
if __name__ == "__main__":
try:
db_agent_tools = MediaCrawlerDB()
print("数据库工具初始化成功,开始执行测试场景...\n")
# 场景1: (新) 查找过去一周综合热度最高的内容 (不再需要sort_by)
response1 = db_agent_tools.search_hot_content(time_period='week', limit=5)
print_response_summary(response1)
# 场景2: 查找过去24小时内综合热度最高的内容
response2 = db_agent_tools.search_hot_content(time_period='24h', limit=5)
print_response_summary(response2)
# 场景3: 全局搜索"罗永浩"
response3 = db_agent_tools.search_topic_globally(topic="罗永浩", limit_per_table=2)
print_response_summary(response3)
# 场景4: (新增) 在B站上精确搜索"论文"
response4 = db_agent_tools.search_topic_on_platform(platform='bilibili', topic="论文", limit=5)
print_response_summary(response4)
# 场景5: (新增) 在微博上精确搜索 "许凯" 在特定一天内的内容
response5 = db_agent_tools.search_topic_on_platform(platform='weibo', topic="许凯", start_date='2025-08-22', end_date='2025-08-22', limit=5)
print_response_summary(response5)
except ValueError as e:
print(f"初始化失败: {e}")
print("请确保相关的数据库环境变量已正确设置, 或在代码中直接提供连接信息。")
except Exception as e:
print(f"测试过程中发生未知错误: {e}")