戒酒的李白

Private database operation tools completed.

"""
专为 AI Agent 设计的舆情搜索工具集 (Tavily)
专为 AI Agent 设计的本地舆情数据库查询工具集 (MediaCrawlerDB)
版本: 1.5
版本: 3.0
最后更新: 2025-08-22
此脚本将复杂的Tavily搜索功能分解为一系列目标明确、参数极少的独立工具,
专为AI Agent调用而设计。Agent只需根据任务意图选择合适的工具,
无需理解复杂的参数组合。所有工具默认搜索“新闻”(topic='news')。
此脚本将复杂的本地MySQL数据库查询功能封装成一系列目标明确、参数清晰的独立工具,
专为AI Agent调用而设计。Agent只需根据任务意图(如搜索热点、全局搜索话题、
按时间范围分析、获取评论)选择合适的工具,无需编写复杂的SQL语句。
新特性:
- 新增 `basic_search_news` 工具,用于执行标准、通用的新闻搜索。
- 每个搜索结果现在都包含 `published_date` (新闻发布日期)。
V3.0 核心更新:
- 智能热度计算: `search_hot_content`不再需要`sort_by`参数,改为内部使用统一的加权热度算法,
综合点赞、评论、分享、观看等数据计算热度分值,使结果更智能、更符合综合热度。
- 新增平台精搜工具: 新增 `search_topic_on_platform` 工具,作为特例,
允许Agent在特定平台(B站、微博等七大平台)上对某一话题进行精确搜索,并支持时间筛选。
- 结构优化: 调整了数据结构与函数文档,以适应新功能。
主要工具:
- basic_search_news: (新增) 执行标准、快速的通用新闻搜索。
- deep_search_news: 对主题进行最全面的深度分析。
- search_news_last_24_hours: 获取24小时内的最新动态。
- search_news_last_week: 获取过去一周的主要报道。
- search_images_for_news: 查找与新闻主题相关的图片。
- search_news_by_date: 在指定的历史日期范围内搜索。
- search_hot_content: 查找指定时间范围内的综合热度最高的内容。
- search_topic_globally: 在整个数据库中全局搜索与特定话题相关的所有内容和评论。
- search_topic_by_date: 在指定的历史日期范围内搜索与特定话题相关的内容。
- get_comments_for_topic: 专门提取公众对于某一特定话题的评论数据。
- search_topic_on_platform: 在指定的单个社交媒体平台上搜索特定话题。
"""
import os
from typing import List, Dict, Any, Optional
import json
import pymysql
import pymysql.cursors
from typing import List, Dict, Any, Optional, Literal
from dataclasses import dataclass, field
# 运行前请确保已安装Tavily库: pip install tavily-python
try:
from tavily import TavilyClient
except ImportError:
raise ImportError("Tavily库未安装,请运行 `pip install tavily-python` 进行安装。")
from datetime import datetime, timedelta, date
# --- 1. 数据结构定义 ---
@dataclass
class SearchResult:
"""
网页搜索结果数据类
包含 published_date 属性来存储新闻发布日期
"""
title: str
url: str
content: str
score: Optional[float] = None
raw_content: Optional[str] = None
published_date: Optional[str] = None
@dataclass
class ImageResult:
"""图片搜索结果数据类"""
url: str
description: Optional[str] = None
class QueryResult:
"""统一的数据库查询结果数据类"""
platform: str
content_type: str
title_or_content: str
author_nickname: Optional[str] = None
url: Optional[str] = None
publish_time: Optional[datetime] = None
engagement: Dict[str, int] = field(default_factory=dict)
source_keyword: Optional[str] = None
hotness_score: float = 0.0 # 新增:综合热度分
source_table: str = ""
@dataclass
class TavilyResponse:
"""封装Tavily API的完整返回结果,以便在工具间传递"""
query: str
answer: Optional[str] = None
results: List[SearchResult] = field(default_factory=list)
images: List[ImageResult] = field(default_factory=list)
response_time: Optional[float] = None
class DBResponse:
"""封装工具的完整返回结果"""
tool_name: str
parameters: Dict[str, Any]
results: List[QueryResult] = field(default_factory=list)
results_count: int = 0
error_message: Optional[str] = None
# --- 2. 核心客户端与专用工具集 ---
class TavilyNewsAgency:
"""
一个包含多种专用新闻舆情搜索工具的客户端。
每个公共方法都设计为供 AI Agent 独立调用的工具。
"""
class MediaCrawlerDB:
"""包含多种专用舆情数据库查询工具的客户端"""
# 权重定义
W_LIKE = 1.0
W_COMMENT = 5.0
W_SHARE = 10.0 # 分享/转发/收藏/投币等高价值互动
W_VIEW = 0.1
W_DANMAKU = 0.5
def __init__(self, api_key: Optional[str] = None):
def __init__(self):
"""
初始化客户端。
Args:
api_key: Tavily API密钥,若不提供则从环境变量 TAVILY_API_KEY 读取。
初始化客户端。连接信息从环境变量自动读取:
- DB_HOST, DB_USER, DB_PASSWORD, DB_NAME
- DB_PORT (可选, 默认 3306)
- DB_CHARSET (可选, 默认 utf8mb4)
"""
if api_key is None:
api_key = os.getenv("TAVILY_API_KEY")
if not api_key:
raise ValueError("Tavily API Key未找到!请设置TAVILY_API_KEY环境变量或在初始化时提供")
self._client = TavilyClient(api_key=api_key)
def _search_internal(self, **kwargs) -> TavilyResponse:
"""内部通用的搜索执行器,所有工具最终都调用此方法"""
self.db_config = {
'host': os.getenv("DB_HOST"),
'user': os.getenv("DB_USER"),
'password': os.getenv("DB_PASSWORD"),
'db': os.getenv("DB_NAME"),
'port': int(os.getenv("DB_PORT", 3306)),
'charset': os.getenv("DB_CHARSET", "utf8mb4"),
'cursorclass': pymysql.cursors.DictCursor
}
required = ['host', 'user', 'password', 'db']
if missing := [k for k in required if not self.db_config[k]]:
raise ValueError(f"数据库配置缺失! 请设置环境变量或在代码中提供: {', '.join([f'DB_{k.upper()}' for k in missing])}")
def _execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
conn = None
try:
kwargs['topic'] = 'general'
api_params = {k: v for k, v in kwargs.items() if v is not None}
response_dict = self._client.search(**api_params)
search_results = [
SearchResult(
title=item.get('title'),
url=item.get('url'),
content=item.get('content'),
score=item.get('score'),
raw_content=item.get('raw_content'),
published_date=item.get('published_date')
) for item in response_dict.get('results', [])
]
image_results = [ImageResult(url=item.get('url'), description=item.get('description')) for item in response_dict.get('images', [])]
return TavilyResponse(
query=response_dict.get('query'), answer=response_dict.get('answer'),
results=search_results, images=image_results,
response_time=response_dict.get('response_time')
)
except Exception as e:
print(f"搜索时发生错误: {str(e)}")
return TavilyResponse(query=kwargs.get("query", "Unknown Query"))
conn = pymysql.connect(**self.db_config)
with conn.cursor() as cursor:
cursor.execute(query, params or ())
return cursor.fetchall()
except pymysql.Error as e:
print(f"数据库查询时发生错误: {e}")
return []
finally:
if conn: conn.close()
@staticmethod
def _to_datetime(ts: Any) -> Optional[datetime]:
if not ts: return None
try:
if isinstance(ts, datetime): return ts
if isinstance(ts, date): return datetime.combine(ts, datetime.min.time())
if isinstance(ts, (int, float)) or str(ts).isdigit():
val = float(ts)
return datetime.fromtimestamp(val / 1000 if val > 1_000_000_000_000 else val)
if isinstance(ts, str):
return datetime.fromisoformat(ts.split('+')[0].strip())
except (ValueError, TypeError): return None
_table_columns_cache = {}
def _get_table_columns(self, table_name: str) -> List[str]:
if table_name in self._table_columns_cache: return self._table_columns_cache[table_name]
results = self._execute_query(f"SHOW COLUMNS FROM `{table_name}`")
columns = [row['Field'] for row in results] if results else []
self._table_columns_cache[table_name] = columns
return columns
def _extract_engagement(self, row: Dict[str, Any]) -> Dict[str, int]:
"""从数据行中提取并统一互动指标"""
engagement = {}
mapping = { 'likes': ['liked_count', 'like_count', 'voteup_count', 'comment_like_count'], 'comments': ['video_comment', 'comments_count', 'comment_count', 'total_replay_num', 'sub_comment_count'], 'shares': ['video_share_count', 'shared_count', 'share_count', 'total_forwards'], 'views': ['video_play_count', 'viewd_count'], 'favorites': ['video_favorite_count', 'collected_count'], 'coins': ['video_coin_count'], 'danmaku': ['video_danmaku'], }
for key, potential_cols in mapping.items():
for col in potential_cols:
if col in row and row[col] is not None:
try: engagement[key] = int(row[col])
except (ValueError, TypeError): engagement[key] = 0
break
return engagement
def search_hot_content(
self,
time_period: Literal['24h', 'week', 'year'] = 'week',
limit: int = 10
) -> DBResponse:
"""
【工具】查找热点内容: (已简化) 获取最近一段时间内综合热度最高的内容。
# --- Agent 可用的工具方法 ---
Args:
time_period (Literal['24h', 'week', 'year']): 时间范围,默认为 'week'。
limit (int): 返回结果的最大数量,默认为 10。
def basic_search_news(self, query: str, max_results: int = 7) -> TavilyResponse:
Returns:
DBResponse: 包含按综合热度排序后的内容列表。
"""
【工具】基础新闻搜索: 执行一次标准、快速的新闻搜索。
这是最常用的通用搜索工具,适用于不确定需要何种特定搜索时。
Agent可提供搜索查询(query)和可选的最大结果数(max_results)。
params_for_log = {'time_period': time_period, 'limit': limit}
print(f"--- TOOL: 查找热点内容 (params: {params_for_log}) ---")
now = datetime.now()
start_time = now - timedelta(days={'24h': 1, 'week': 7}.get(time_period, 365))
# 定义各平台的热度计算SQL片段
hotness_formulas = {
'bilibili_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(video_comment AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(video_share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_favorite_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_coin_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(video_danmaku AS UNSIGNED), 0) * {self.W_DANMAKU} + COALESCE(CAST(video_play_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})",
'douyin_aweme': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})",
'weibo_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comments_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(shared_count AS UNSIGNED), 0) * {self.W_SHARE})",
'xhs_note': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT} + COALESCE(CAST(share_count AS UNSIGNED), 0) * {self.W_SHARE} + COALESCE(CAST(collected_count AS UNSIGNED), 0) * {self.W_SHARE})",
'kuaishou_video': f"(COALESCE(CAST(liked_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(viewd_count AS DECIMAL(20,2)), 0) * {self.W_VIEW})",
'zhihu_content': f"(COALESCE(CAST(voteup_count AS UNSIGNED), 0) * {self.W_LIKE} + COALESCE(CAST(comment_count AS UNSIGNED), 0) * {self.W_COMMENT})",
}
all_queries, params = [], []
for table, formula in hotness_formulas.items():
time_filter_sql, time_filter_param = "", None
if table == 'weibo_note': time_filter_sql, time_filter_param = "`create_date_time` >= %s", start_time.strftime('%Y-%m-%d %H:%M:%S')
elif table in ['kuaishou_video', 'xhs_note', 'douyin_aweme']: time_col = 'time' if table == 'xhs_note' else 'create_time'; time_filter_sql, time_filter_param = f"`{time_col}` >= %s", str(int(start_time.timestamp() * 1000))
elif table == 'zhihu_content': time_filter_sql, time_filter_param = "CAST(`created_time` AS UNSIGNED) >= %s", str(int(start_time.timestamp()))
else: time_filter_sql, time_filter_param = "`create_time` >= %s", str(int(start_time.timestamp()))
content_type = 'note' if table in ['weibo_note', 'xhs_note'] else 'content' if table == 'zhihu_content' else 'video'
query_template = "SELECT '{platform}' as p, '{type}' as t, {title} as title, {author} as author, {url} as url, {ts} as ts, {formula} as hotness_score, source_keyword, '{tbl}' as tbl FROM `{tbl}` WHERE {time_filter}"
field_subs = {'platform': table.split('_')[0], 'type': content_type, 'title': 'title', 'author': 'nickname', 'url': 'video_url', 'ts': 'create_time', 'formula': formula, 'tbl': table, 'time_filter': time_filter_sql}
if table == 'weibo_note': field_subs.update({'title': 'content', 'url': 'note_url', 'ts': 'create_date_time'})
elif table == 'xhs_note': field_subs.update({'ts': 'time', 'url': 'note_url'})
elif table == 'zhihu_content': field_subs.update({'author': 'user_nickname', 'url': 'content_url', 'ts': 'created_time'})
elif table == 'douyin_aweme': field_subs.update({'url': 'aweme_url'})
all_queries.append(query_template.format(**field_subs))
params.append(time_filter_param)
final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY hotness_score DESC LIMIT %s"
raw_results = self._execute_query(final_query, tuple(params) + (limit,))
formatted_results = [QueryResult(platform=r['p'], content_type=r['t'], title_or_content=r['title'], author_nickname=r.get('author'), url=r['url'], publish_time=self._to_datetime(r['ts']), engagement=self._extract_engagement(r), hotness_score=r.get('hotness_score', 0.0), source_keyword=r.get('source_keyword'), source_table=r['tbl']) for r in raw_results]
return DBResponse("search_hot_content", params_for_log, results=formatted_results, results_count=len(formatted_results))
def search_topic_globally(self, topic: str, limit_per_table: int = 5) -> DBResponse:
"""
print(f"--- TOOL: 基础新闻搜索 (query: {query}) ---")
return self._search_internal(
query=query,
max_results=max_results,
search_depth="basic",
include_answer=False
)
【工具】全局话题搜索: 在数据库中(内容、评论、标签、来源关键字)全面搜索指定话题。
def deep_search_news(self, query: str) -> TavilyResponse:
"""
【工具】深度新闻分析: 对一个主题进行最全面、最深入的搜索。
返回AI生成的“高级”详细摘要答案和最多20条最相关的新闻结果。适用于需要全面了解某个事件背景的场景。
Agent只需提供搜索查询(query)。
"""
print(f"--- TOOL: 深度新闻分析 (query: {query}) ---")
return self._search_internal(
query=query, search_depth="advanced", max_results=20, include_answer="advanced"
)
Args:
topic (str): 要搜索的话题关键词。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 5。
def search_news_last_24_hours(self, query: str) -> TavilyResponse:
Returns:
DBResponse: 包含所有匹配结果的聚合列表。
"""
【工具】搜索24小时内新闻: 获取关于某个主题的最新动态。
此工具专门查找过去24小时内发布的新闻。适用于追踪突发事件或最新进展。
Agent只需提供搜索查询(query)。
params_for_log = {'topic': topic, 'limit_per_table': limit_per_table}
print(f"--- TOOL: 全局话题搜索 (params: {params_for_log}) ---")
search_term, all_results = f"%{topic}%", []
search_configs = { 'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'bilibili_video_comment': {'fields': ['content'], 'type': 'comment'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'douyin_aweme_comment': {'fields': ['content'], 'type': 'comment'}, 'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video'}, 'kuaishou_video_comment': {'fields': ['content'], 'type': 'comment'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note'}, 'weibo_note_comment': {'fields': ['content'], 'type': 'comment'}, 'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note'}, 'xhs_note_comment': {'fields': ['content'], 'type': 'comment'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content'}, 'zhihu_comment': {'fields': ['content'], 'type': 'comment'}, 'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note'}, 'tieba_comment': {'fields': ['content'], 'type': 'comment'}, 'daily_news': {'fields': ['title'], 'type': 'news'}, }
for table, config in search_configs.items():
where_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
query = f"SELECT * FROM `{table}` WHERE {where_clause} ORDER BY id DESC LIMIT %s"
params = (search_term,) * len(config['fields']) + (limit_per_table,)
raw_results = self._execute_query(query, params)
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
time_key = row.get('create_time') or row.get('time') or row.get('created_time') or row.get('publish_time') or row.get('crawl_date')
all_results.append(QueryResult(
platform=table.split('_')[0], content_type=config['type'],
title_or_content=content[:500] if content else '',
author_nickname=row.get('nickname') or row.get('user_nickname') or row.get('user_name'),
url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'),
publish_time=self._to_datetime(time_key),
engagement=self._extract_engagement(row),
source_keyword=row.get('source_keyword'),
source_table=table
))
return DBResponse("search_topic_globally", params_for_log, results=all_results, results_count=len(all_results))
def search_topic_by_date(self, topic: str, start_date: str, end_date: str, limit_per_table: int = 10) -> DBResponse:
"""
print(f"--- TOOL: 搜索24小时内新闻 (query: {query}) ---")
return self._search_internal(query=query, time_range='d', max_results=10)
【工具】按日期搜索话题: 在明确的历史时间段内,搜索与特定话题相关的内容。
def search_news_last_week(self, query: str) -> TavilyResponse:
Args:
topic (str): 要搜索的话题关键词。
start_date (str): 开始日期,格式 'YYYY-MM-DD'。
end_date (str): 结束日期,格式 'YYYY-MM-DD'。
limit_per_table (int): 从每个相关表中返回的最大记录数,默认为 10。
Returns:
DBResponse: 包含在指定日期范围内找到的结果的聚合列表。
"""
【工具】搜索本周新闻: 获取关于某个主题过去一周内的主要新闻报道。
适用于进行周度舆情总结或回顾。
Agent只需提供搜索查询(query)。
params_for_log = {'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit_per_table': limit_per_table}
print(f"--- TOOL: 按日期搜索话题 (params: {params_for_log}) ---")
try:
start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1)
except ValueError:
return DBResponse("search_topic_by_date", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。")
search_term, all_results = f"%{topic}%", []
search_configs = {
'bilibili_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, 'douyin_aweme': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'},
'kuaishou_video': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, 'weibo_note': {'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'},
'xhs_note': {'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, 'zhihu_content': {'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'},
'tieba_note': {'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, 'daily_news': {'fields': ['title'], 'type': 'news', 'time_col': 'crawl_date', 'time_type': 'date_str'},
}
for table, config in search_configs.items():
topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
time_col, time_type = config['time_col'], config['time_type']
if time_type == 'sec': time_params = (int(start_dt.timestamp()), int(end_dt.timestamp()))
elif time_type == 'ms': time_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000))
elif time_type in ['str', 'date_str']: time_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d'))
else: time_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp())))
time_clause = f"`{time_col}` >= %s AND `{time_col}` < %s"
if table == 'zhihu_content': time_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s"
query = f"SELECT * FROM `{table}` WHERE ({topic_clause}) AND ({time_clause}) ORDER BY id DESC LIMIT %s"
params = (search_term,) * len(config['fields']) + time_params + (limit_per_table,)
raw_results = self._execute_query(query, params)
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
all_results.append(QueryResult(
platform=table.split('_')[0], content_type=config['type'],
title_or_content=content[:500] if content else '',
author_nickname=row.get('nickname') or row.get('user_nickname'),
url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'),
publish_time=self._to_datetime(row.get(config['time_col'])),
engagement=self._extract_engagement(row),
source_keyword=row.get('source_keyword'),
source_table=table
))
return DBResponse("search_topic_by_date", params_for_log, results=all_results, results_count=len(all_results))
def get_comments_for_topic(self, topic: str, limit: int = 50) -> DBResponse:
"""
print(f"--- TOOL: 搜索本周新闻 (query: {query}) ---")
return self._search_internal(query=query, time_range='w', max_results=10)
【工具】获取话题评论: 专门搜索并返回所有平台中与特定话题相关的公众评论数据。
Args:
topic (str): 要搜索的话题关键词。
limit (int): 返回评论的总数量上限,默认为 50。
def search_images_for_news(self, query: str) -> TavilyResponse:
Returns:
DBResponse: 包含匹配的评论列表。
"""
【工具】查找新闻图片: 搜索与某个新闻主题相关的图片。
此工具会返回图片链接及描述,适用于需要为报告或文章配图的场景。
Agent只需提供搜索查询(query)。
params_for_log = {'topic': topic, 'limit': limit}
print(f"--- TOOL: 获取话题评论 (params: {params_for_log}) ---")
search_term = f"%{topic}%"
comment_tables = ['bilibili_video_comment', 'douyin_aweme_comment', 'kuaishou_video_comment', 'weibo_note_comment', 'xhs_note_comment', 'zhihu_comment', 'tieba_comment']
all_queries = []
for table in comment_tables:
cols = self._get_table_columns(table)
author_col = 'user_nickname' if 'user_nickname' in cols else 'nickname'
like_col = 'comment_like_count' if 'comment_like_count' in cols else 'like_count' if 'like_count' in cols else None
time_col = 'publish_time' if 'publish_time' in cols else 'create_date_time' if 'create_date_time' in cols else 'create_time'
like_select = f"`{like_col}` as likes" if like_col else "'0' as likes"
query = (f"SELECT '{table.split('_')[0]}' as platform, `content`, `{author_col}` as author, "
f"`{time_col}` as ts, {like_select}, '{table}' as source_table "
f"FROM `{table}` WHERE `content` LIKE %s")
all_queries.append(query)
final_query = f"({' ) UNION ALL ( '.join(all_queries)}) ORDER BY ts DESC LIMIT %s"
params = (search_term,) * len(comment_tables) + (limit,)
raw_results = self._execute_query(final_query, params)
formatted = [QueryResult(platform=r['platform'], content_type='comment', title_or_content=r['content'], author_nickname=r['author'], publish_time=self._to_datetime(r['ts']), engagement={'likes': int(r['likes']) if str(r['likes']).isdigit() else 0}, source_table=r['source_table']) for r in raw_results]
return DBResponse("get_comments_for_topic", params_for_log, results=formatted, results_count=len(formatted))
def search_topic_on_platform(
self,
platform: Literal['bilibili', 'weibo', 'douyin', 'kuaishou', 'xhs', 'zhihu', 'tieba'],
topic: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
limit: int = 20
) -> DBResponse:
"""
print(f"--- TOOL: 查找新闻图片 (query: {query}) ---")
return self._search_internal(
query=query, include_images=True, include_image_descriptions=True, max_results=5
)
【工具】平台定向搜索: (新增) 在指定的单个社交媒体平台上搜索特定话题。
def search_news_by_date(self, query: str, start_date: str, end_date: str) -> TavilyResponse:
"""
【工具】按指定日期范围搜索新闻: 在一个明确的历史时间段内搜索新闻。
这是唯一需要Agent提供详细时间参数的工具。适用于需要对特定历史事件进行分析的场景。
Agent需要提供查询(query)、开始日期(start_date)和结束日期(end_date),格式均为 'YYYY-MM-DD'。
Args:
platform (Literal['bilibili', ...]): 要搜索的平台,必须是七个支持的平台之一。
topic (str): 要搜索的话题关键词。
start_date (Optional[str]): 开始日期,格式 'YYYY-MM-DD'。默认为None。
end_date (Optional[str]): 结束日期,格式 'YYYY-MM-DD'。默认为None。
limit (int): 返回结果的最大数量,默认为 20。
Returns:
DBResponse: 包含在该平台找到的结果列表。
"""
print(f"--- TOOL: 按指定日期范围搜索新闻 (query: {query}, from: {start_date}, to: {end_date}) ---")
return self._search_internal(
query=query, start_date=start_date, end_date=end_date, max_results=15
)
params_for_log = {'platform': platform, 'topic': topic, 'start_date': start_date, 'end_date': end_date, 'limit': limit}
print(f"--- TOOL: 平台定向搜索 (params: {params_for_log}) ---")
all_configs = { 'bilibili': [{'table': 'bilibili_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'sec'}, {'table': 'bilibili_video_comment', 'fields': ['content'], 'type': 'comment'}], 'douyin': [{'table': 'douyin_aweme', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'douyin_aweme_comment', 'fields': ['content'], 'type': 'comment'}], 'kuaishou': [{'table': 'kuaishou_video', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'video', 'time_col': 'create_time', 'time_type': 'ms'}, {'table': 'kuaishou_video_comment', 'fields': ['content'], 'type': 'comment'}], 'weibo': [{'table': 'weibo_note', 'fields': ['content', 'source_keyword'], 'type': 'note', 'time_col': 'create_date_time', 'time_type': 'str'}, {'table': 'weibo_note_comment', 'fields': ['content'], 'type': 'comment'}], 'xhs': [{'table': 'xhs_note', 'fields': ['title', 'desc', 'tag_list', 'source_keyword'], 'type': 'note', 'time_col': 'time', 'time_type': 'ms'}, {'table': 'xhs_note_comment', 'fields': ['content'], 'type': 'comment'}], 'zhihu': [{'table': 'zhihu_content', 'fields': ['title', 'desc', 'content_text', 'source_keyword'], 'type': 'content', 'time_col': 'created_time', 'time_type': 'sec_str'}, {'table': 'zhihu_comment', 'fields': ['content'], 'type': 'comment'}], 'tieba': [{'table': 'tieba_note', 'fields': ['title', 'desc', 'source_keyword'], 'type': 'note', 'time_col': 'publish_time', 'time_type': 'str'}, {'table': 'tieba_comment', 'fields': ['content'], 'type': 'comment'}] }
# --- 3. 测试与使用示例 ---
if platform not in all_configs:
return DBResponse("search_topic_on_platform", params_for_log, error_message=f"不支持的平台: {platform}")
def print_response_summary(response: TavilyResponse):
"""简化的打印函数,用于展示测试结果,现在会显示发布日期"""
if not response or not response.query:
print("未能获取有效响应。")
search_term, all_results = f"%{topic}%", []
platform_configs = all_configs[platform]
time_clause, time_params_tuple = "", ()
if start_date and end_date:
try:
start_dt, end_dt = datetime.strptime(start_date, '%Y-%m-%d'), datetime.strptime(end_date, '%Y-%m-%d') + timedelta(days=1)
except ValueError:
return DBResponse("search_topic_on_platform", params_for_log, error_message="日期格式错误,请使用 'YYYY-MM-DD' 格式。")
else:
start_dt, end_dt = None, None
for config in platform_configs:
table = config['table']
topic_clause = " OR ".join([f"`{field}` LIKE %s" for field in config['fields']])
query = f"SELECT * FROM `{table}` WHERE {topic_clause}"
params = [search_term] * len(config['fields'])
if start_dt and end_dt and 'time_col' in config:
time_col, time_type = config['time_col'], config['time_type']
if time_type == 'sec': t_params = (int(start_dt.timestamp()), int(end_dt.timestamp()))
elif time_type == 'ms': t_params = (int(start_dt.timestamp() * 1000), int(end_dt.timestamp() * 1000))
elif time_type in ['str', 'date_str']: t_params = (start_dt.strftime('%Y-%m-%d'), end_dt.strftime('%Y-%m-%d'))
else: t_params = (str(int(start_dt.timestamp())), str(int(end_dt.timestamp())))
t_clause = f"`{time_col}` >= %s AND `{time_col}` < %s"
if table == 'zhihu_content': t_clause = f"CAST(`{time_col}` AS UNSIGNED) >= %s AND CAST(`{time_col}` AS UNSIGNED) < %s"
query += f" AND ({t_clause})"
params.extend(t_params)
query += f" ORDER BY id DESC LIMIT %s"
params.append(limit)
raw_results = self._execute_query(query, tuple(params))
for row in raw_results:
content = (row.get('title') or row.get('content') or row.get('desc') or row.get('content_text', ''))
time_key = config.get('time_col') and row.get(config.get('time_col'))
all_results.append(QueryResult(platform=platform, content_type=config['type'], title_or_content=content[:500] if content else '', author_nickname=row.get('nickname') or row.get('user_nickname'), url=row.get('video_url') or row.get('note_url') or row.get('content_url') or row.get('url') or row.get('aweme_url'), publish_time=self._to_datetime(time_key), engagement=self._extract_engagement(row), source_keyword=row.get('source_keyword'), source_table=table))
return DBResponse("search_topic_on_platform", params_for_log, results=all_results, results_count=len(all_results))
# --- 3. 测试与使用示例 ---
def print_response_summary(response: DBResponse):
"""简化的打印函数,用于展示测试结果"""
if response.error_message:
print(f"工具 '{response.tool_name}' 执行出错: {response.error_message}")
print("-" * 80)
return
print(f"\n查询: '{response.query}' | 耗时: {response.response_time}s")
if response.answer:
print(f"AI摘要: {response.answer[:120]}...")
print(f"找到 {len(response.results)} 条网页, {len(response.images)} 张图片。")
if response.results:
first_result = response.results[0]
date_info = f"(发布于: {first_result.published_date})" if first_result.published_date else ""
print(f"第一条结果: {first_result.title} {date_info}")
print("-" * 60)
params_str = ", ".join(f"{k}='{v}'" for k, v in response.parameters.items())
print(f"查询: 工具='{response.tool_name}', 参数=[{params_str}]")
print(f"找到 {response.results_count} 条相关记录。")
if response.results:
print("--- 前5条结果示例 ---")
for i, res in enumerate(response.results[:5]):
engagement_str = ", ".join(f"{k}: {v}" for k, v in res.engagement.items() if v)
content_preview = (res.title_or_content.replace('\n', ' ')[:70] + '...') if res.title_or_content and len(res.title_or_content) > 70 else res.title_or_content
hotness_str = f", hotness: {res.hotness_score:.2f}" if res.hotness_score > 0 else ""
print(
f"{i+1}. [{res.platform.upper()}/{res.content_type}] {content_preview}\n"
f" by: {res.author_nickname}, at: {res.publish_time.strftime('%Y-%m-%d %H:%M') if res.publish_time else 'N/A'}"
f", src_kw: '{res.source_keyword or 'N/A'}'{hotness_str}"
f", engagement: {{{engagement_str}}}"
)
print("-" * 80)
if __name__ == "__main__":
# 在运行前,请确保您已设置 TAVILY_API_KEY 环境变量
try:
# 初始化“新闻社”客户端,它内部包含了所有工具
agency = TavilyNewsAgency()
db_agent_tools = MediaCrawlerDB()
print("数据库工具初始化成功,开始执行测试场景...\n")
# 场景1: Agent 进行一次常规、快速的搜索
response1 = agency.basic_search_news(query="奥运会最新赛况", max_results=5)
# 场景1: (新) 查找过去一周综合热度最高的内容 (不再需要sort_by)
response1 = db_agent_tools.search_hot_content(time_period='week', limit=5)
print_response_summary(response1)
# 场景2: Agent 需要全面了解“全球芯片技术竞争”的背景
response2 = agency.deep_search_news(query="全球芯片技术竞争")
# 场景2: 查找过去24小时内综合热度最高的内容
response2 = db_agent_tools.search_hot_content(time_period='24h', limit=5)
print_response_summary(response2)
# 场景3: Agent 需要追踪“GTC大会”的最新消息
response3 = agency.search_news_last_24_hours(query="Nvidia GTC大会 最新发布")
# 场景3: 全局搜索"罗永浩"
response3 = db_agent_tools.search_topic_globally(topic="罗永浩", limit_per_table=2)
print_response_summary(response3)
# 场景4: Agent 需要为一篇关于“自动驾驶”的周报查找素材
response4 = agency.search_news_last_week(query="自动驾驶商业化落地")
# 场景4: (新增) 在B站上精确搜索"论文"
response4 = db_agent_tools.search_topic_on_platform(platform='bilibili', topic="论文", limit=5)
print_response_summary(response4)
# 场景5: Agent 需要查找“韦伯太空望远镜”的新闻图片
response5 = agency.search_images_for_news(query="韦伯太空望远镜最新发现")
# 场景5: (新增) 在微博上精确搜索 "许凯" 在特定一天内的内容
response5 = db_agent_tools.search_topic_on_platform(platform='weibo', topic="许凯", start_date='2025-08-22', end_date='2025-08-22', limit=5)
print_response_summary(response5)
# 场景6: Agent 需要研究2025年第一季度关于“人工智能法规”的新闻
response6 = agency.search_news_by_date(
query="人工智能法规",
start_date="2025-01-01",
end_date="2025-03-31"
)
print_response_summary(response6)
except ValueError as e:
print(f"初始化失败: {e}")
print("请确保 TAVILY_API_KEY 环境变量已正确设置。")
print("请确保相关的数据库环境变量已正确设置, 或在代码中直接提供连接信息。")
except Exception as e:
print(f"测试过程中发生未知错误: {e}")
\ No newline at end of file
... ...