Doiiars

1. 同步MediaCrawler为最新版本

2. 修复数据库not null错误
3. 支持PG数据库
4. 规范环境变量及配置使用
5. 规范为uv安装
6. 使用loggru
... ... @@ -7,11 +7,12 @@ BroadTopicExtraction模块 - 数据库管理器
import sys
import json
from datetime import datetime, date
from datetime import datetime, date, timedelta
from pathlib import Path
from typing import List, Dict, Optional
import pymysql
from pymysql.cursors import DictCursor
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -22,37 +23,44 @@ try:
except ImportError:
raise ImportError("无法导入config.py配置文件")
from config import settings
class DatabaseManager:
"""数据库管理器"""
def __init__(self):
"""初始化数据库管理器"""
self.connection = None
self.engine: Engine = None
self.connect()
def connect(self):
"""连接数据库"""
try:
self.connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
autocommit=True,
cursorclass=DictCursor
)
print(f"成功连接到数据库: {config.DB_NAME}")
dialect = (settings.DB_DIALECT or "mysql").lower()
if dialect in ("postgresql", "postgres"):
url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
else:
url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}"
self.engine = create_engine(url, future=True)
logger.info(f"成功连接到数据库: {settings.DB_NAME}")
except ModuleNotFoundError as e:
missing: str = str(e)
if "psycopg" in missing:
logger.error("数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]")
elif "pymysql" in missing:
logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql")
else:
logger.error(f"数据库连接失败(缺少驱动): {e}")
raise
except Exception as e:
print(f"数据库连接失败: {e}")
logger.error(f"数据库连接失败: {e}")
raise
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
print("数据库连接已关闭")
if self.engine:
self.engine.dispose()
logger.info("数据库连接已关闭")
def __enter__(self):
return self
... ... @@ -79,48 +87,49 @@ class DatabaseManager:
current_timestamp = int(datetime.now().timestamp())
try:
cursor = self.connection.cursor()
# 先删除当天所有的新闻记录(覆盖模式)
delete_query = "DELETE FROM daily_news WHERE crawl_date = %s"
deleted_count = cursor.execute(delete_query, (crawl_date,))
if deleted_count > 0:
print(f"覆盖模式:删除了当天已有的 {deleted_count} 条新闻记录")
# 批量插入新记录
saved_count = 0
# 先独立事务执行删除,防止后续插入失败导致无法清理
with self.engine.begin() as conn:
deleted = conn.execute(text("DELETE FROM daily_news WHERE crawl_date = :d"), {"d": crawl_date}).rowcount
if deleted and deleted > 0:
logger.info(f"覆盖模式:删除了当天已有的 {deleted} 条新闻记录")
# 逐条插入,单条失败不影响后续(每条独立事务)
for news_item in news_data:
try:
# 简化的新闻ID生成
news_id = f"{news_item.get('source', 'unknown')}_{news_item.get('id', news_item.get('rank', 0))}"
# 插入新记录
insert_query = """
INSERT INTO daily_news (
news_id, source_platform, title, url, crawl_date,
rank_position, add_ts
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
news_id,
news_item.get('source', 'unknown'),
news_item.get('title', ''),
news_item.get('url', ''),
crawl_date,
news_item.get('rank', None),
current_timestamp
))
title_val = (news_item.get("title", "") or "")
if len(title_val) > 500:
title_val = title_val[:500]
with self.engine.begin() as conn:
conn.execute(
text(
"""
INSERT INTO daily_news (
news_id, source_platform, title, url, crawl_date,
rank_position, add_ts, last_modify_ts
) VALUES (:news_id, :source_platform, :title, :url, :crawl_date, :rank_position, :add_ts, :last_modify_ts)
"""
),
{
"news_id": news_id,
"source_platform": news_item.get("source", "unknown"),
"title": title_val,
"url": news_item.get("url", ""),
"crawl_date": crawl_date,
"rank_position": news_item.get("rank", None),
"add_ts": current_timestamp,
"last_modify_ts": current_timestamp,
},
)
saved_count += 1
except Exception as e:
print(f"保存单条新闻失败: {e}")
logger.warning(f"保存单条新闻失败: {e}")
continue
print(f"成功保存 {saved_count} 条新闻记录")
logger.info(f"成功保存 {saved_count} 条新闻记录")
return saved_count
except Exception as e:
print(f"保存新闻数据失败: {e}")
logger.exception(f"保存新闻数据失败: {e}")
return 0
def get_daily_news(self, crawl_date: date = None) -> List[Dict]:
... ... @@ -136,15 +145,13 @@ class DatabaseManager:
if not crawl_date:
crawl_date = date.today()
query = """
SELECT * FROM daily_news
WHERE crawl_date = %s
ORDER BY rank_position ASC
"""
cursor = self.connection.cursor()
cursor.execute(query, (crawl_date,))
return cursor.fetchall()
query = (
"SELECT * FROM daily_news WHERE crawl_date = :d ORDER BY rank_position ASC"
)
with self.engine.connect() as conn:
result = conn.execute(text(query), {"d": crawl_date})
rows = result.mappings().all()
return rows
# ==================== 话题数据操作 ====================
... ... @@ -166,37 +173,31 @@ class DatabaseManager:
current_timestamp = int(datetime.now().timestamp())
try:
cursor = self.connection.cursor()
# 检查今天是否已有记录
check_query = "SELECT id FROM daily_topics WHERE extract_date = %s"
cursor.execute(check_query, (extract_date,))
existing = cursor.fetchone()
keywords_json = json.dumps(keywords, ensure_ascii=False)
if existing:
# 更新现有记录
update_query = """
UPDATE daily_topics
SET keywords = %s, summary = %s, add_ts = %s
WHERE extract_date = %s
"""
cursor.execute(update_query, (keywords_json, summary, current_timestamp, extract_date))
print(f"更新了 {extract_date} 的话题分析")
else:
# 插入新记录
insert_query = """
INSERT INTO daily_topics (extract_date, keywords, summary, add_ts)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_query, (extract_date, keywords_json, summary, current_timestamp))
print(f"保存了 {extract_date} 的话题分析")
with self.engine.begin() as conn:
check = conn.execute(
text("SELECT id FROM daily_topics WHERE extract_date = :d AND topic_id = :tid"),
{"d": extract_date, "tid": "summary"},
).first()
if check:
conn.execute(
text(
"UPDATE daily_topics SET keywords = :k, topic_description = :s, add_ts = :ts, last_modify_ts = :lmt, topic_name = :tn WHERE extract_date = :d AND topic_id = :tid"
),
{"k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp, "d": extract_date, "tid": "summary", "tn": "每日新闻分析"},
)
logger.info(f"更新了 {extract_date} 的话题分析")
else:
conn.execute(
text(
"INSERT INTO daily_topics (extract_date, topic_id, topic_name, keywords, topic_description, add_ts, last_modify_ts) VALUES (:d, :tid, :tn, :k, :s, :ts, :lmt)"
),
{"d": extract_date, "tid": "summary", "tn": "每日新闻分析", "k": keywords_json, "s": summary, "ts": current_timestamp, "lmt": current_timestamp},
)
logger.info(f"保存了 {extract_date} 的话题分析")
return True
except Exception as e:
print(f"保存话题分析失败: {e}")
logger.exception(f"保存话题分析失败: {e}")
return False
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
... ... @@ -213,20 +214,15 @@ class DatabaseManager:
extract_date = date.today()
try:
cursor = self.connection.cursor()
query = "SELECT * FROM daily_topics WHERE extract_date = %s"
cursor.execute(query, (extract_date,))
result = cursor.fetchone()
if result:
# 解析关键词JSON
result['keywords'] = json.loads(result['keywords'])
return result
else:
with self.engine.connect() as conn:
result = conn.execute(text("SELECT * FROM daily_topics WHERE extract_date = :d"), {"d": extract_date}).mappings().first()
if result:
result = dict(result) # 转为可变dict以支持item赋值
result["keywords"] = json.loads(result["keywords"]) if result.get("keywords") else []
return result
return None
except Exception as e:
print(f"获取话题分析失败: {e}")
logger.exception(f"获取话题分析失败: {e}")
return None
def get_recent_topics(self, days: int = 7) -> List[Dict]:
... ... @@ -240,23 +236,23 @@ class DatabaseManager:
话题分析列表
"""
try:
cursor = self.connection.cursor()
query = """
SELECT * FROM daily_topics
WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY extract_date DESC
"""
cursor.execute(query, (days,))
results = cursor.fetchall()
# 解析每个结果的关键词JSON
for result in results:
result['keywords'] = json.loads(result['keywords'])
return results
start_date = date.today() - timedelta(days=days)
with self.engine.connect() as conn:
results = conn.execute(
text(
"""
SELECT * FROM daily_topics
WHERE extract_date >= :start_date
ORDER BY extract_date DESC
"""
),
{"start_date": start_date},
).mappings().all()
for r in results:
r["keywords"] = json.loads(r["keywords"]) if r.get("keywords") else []
return results
except Exception as e:
print(f"获取最近话题分析失败: {e}")
logger.exception(f"获取最近话题分析失败: {e}")
return []
# ==================== 统计查询 ====================
... ... @@ -264,56 +260,48 @@ class DatabaseManager:
def get_summary_stats(self, days: int = 7) -> Dict:
"""获取统计摘要"""
try:
cursor = self.connection.cursor()
# 新闻统计
news_query = """
SELECT
crawl_date,
COUNT(*) as news_count,
COUNT(DISTINCT source_platform) as platforms_count
FROM daily_news
WHERE crawl_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY crawl_date
ORDER BY crawl_date DESC
"""
cursor.execute(news_query, (days,))
news_stats = cursor.fetchall()
# 话题统计
topics_query = """
SELECT
extract_date,
keywords,
CHAR_LENGTH(summary) as summary_length
FROM daily_topics
WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY extract_date DESC
"""
cursor.execute(topics_query, (days,))
topics_stats = cursor.fetchall()
return {
'news_stats': news_stats,
'topics_stats': topics_stats
}
start_date = date.today() - timedelta(days=days)
with self.engine.connect() as conn:
news_stats = conn.execute(
text(
"""
SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms_count
FROM daily_news
WHERE crawl_date >= :start_date
GROUP BY crawl_date
ORDER BY crawl_date DESC
"""
),
{"start_date": start_date},
).all()
topics_stats = conn.execute(
text(
"""
SELECT extract_date, keywords, CHAR_LENGTH(topic_description) as summary_length
FROM daily_topics
WHERE extract_date >= :start_date
ORDER BY extract_date DESC
"""
),
{"start_date": start_date},
).all()
return {"news_stats": news_stats, "topics_stats": topics_stats}
except Exception as e:
print(f"获取统计摘要失败: {e}")
return {'news_stats': [], 'topics_stats': []}
logger.exception(f"获取统计摘要失败: {e}")
return {"news_stats": [], "topics_stats": []}
if __name__ == "__main__":
# 测试数据库管理器
with DatabaseManager() as db:
# 测试获取新闻
news = db.get_daily_news()
print(f"今日新闻数量: {len(news)}")
logger.info(f"今日新闻数量: {len(news)}")
# 测试获取话题
topics = db.get_daily_topics()
if topics:
print(f"今日话题关键词: {topics['keywords']}")
logger.info(f"今日话题关键词: {topics['keywords']}")
else:
print("今日暂无话题分析")
logger.info("今日暂无话题分析")
print("简化数据库管理器测试完成!")
logger.info("简化数据库管理器测试完成!")
... ...
... ... @@ -11,6 +11,7 @@ import argparse
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -21,8 +22,8 @@ try:
from BroadTopicExtraction.topic_extractor import TopicExtractor
from BroadTopicExtraction.database_manager import DatabaseManager
except ImportError as e:
print(f"导入模块失败: {e}")
print("请确保在项目根目录运行,并且已安装所有依赖")
logger.exception(f"导入模块失败: {e}")
logger.error("请确保在项目根目录运行,并且已安装所有依赖")
sys.exit(1)
class BroadTopicExtraction:
... ... @@ -34,7 +35,7 @@ class BroadTopicExtraction:
self.topic_extractor = TopicExtractor()
self.db_manager = DatabaseManager()
print("BroadTopicExtraction 初始化完成")
logger.info("BroadTopicExtraction 初始化完成")
def close(self):
"""关闭资源"""
... ... @@ -68,21 +69,22 @@ class BroadTopicExtraction:
Returns:
包含完整提取结果的字典
"""
print("\n" + "=" * 80)
print("MindSpider AI爬虫 - 每日话题提取")
print("=" * 80)
print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"目标日期: {date.today()}")
extraction_result_message = ""
extraction_result_message += "\nMindSpider AI爬虫 - 每日话题提取\n"
extraction_result_message += f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
extraction_result_message += f"目标日期: {date.today()}\n"
if news_sources:
print(f"指定平台: {len(news_sources)} 个")
extraction_result_message += f"指定平台: {len(news_sources)} 个\n"
for source in news_sources:
source_name = SOURCE_NAMES.get(source, source)
print(f" - {source_name}")
extraction_result_message += f" - {source_name}\n"
else:
print(f"爬取平台: 全部 {len(SOURCE_NAMES)} 个平台")
extraction_result_message += f"爬取平台: 全部 {len(SOURCE_NAMES)} 个平台\n"
print(f"关键词数: 最多 {max_keywords} 个")
extraction_result_message += f"关键词数: 最多 {max_keywords} 个\n"
logger.info(extraction_result_message)
extraction_result = {
'success': False,
... ... @@ -96,7 +98,7 @@ class BroadTopicExtraction:
try:
# 步骤1: 收集新闻
print("\n【步骤1】收集热点新闻...")
logger.info("【步骤1】收集热点新闻...")
news_result = await self.news_collector.collect_and_save_news(
sources=news_sources
)
... ... @@ -112,7 +114,7 @@ class BroadTopicExtraction:
raise Exception("新闻收集失败或没有获取到新闻")
# 步骤2: 提取关键词和生成总结
print("\n【步骤2】提取关键词和生成总结...")
logger.info("【步骤2】提取关键词和生成总结...")
keywords, summary = self.topic_extractor.extract_keywords_and_summary(
news_result['news_list'],
max_keywords=max_keywords
... ... @@ -126,10 +128,10 @@ class BroadTopicExtraction:
}
if not keywords:
print("警告: 没有提取到有效关键词")
logger.warning("警告: 没有提取到有效关键词")
# 步骤3: 保存到数据库
print("\n【步骤3】保存分析结果到数据库...")
logger.info("【步骤3】保存分析结果到数据库...")
save_success = self.db_manager.save_daily_topics(
keywords, summary, date.today()
)
... ... @@ -141,56 +143,47 @@ class BroadTopicExtraction:
extraction_result['success'] = True
extraction_result['end_time'] = datetime.now().isoformat()
print("\n" + "=" * 80)
print("每日话题提取流程完成!")
print("=" * 80)
logger.info("每日话题提取流程完成!")
return extraction_result
except Exception as e:
print(f"\n话题提取流程失败: {e}")
logger.exception(f"话题提取流程失败: {e}")
extraction_result['error'] = str(e)
extraction_result['end_time'] = datetime.now().isoformat()
return extraction_result
def print_extraction_results(self, extraction_result: Dict):
"""打印提取结果"""
print("\n" + "=" * 80)
print("话题提取结果报告")
print("=" * 80)
if not extraction_result['success']:
print(f"❌ 提取失败: {extraction_result.get('error', '未知错误')}")
return
extraction_result_message = ""
# 新闻收集结果
news_data = extraction_result.get('news_collection', {})
print(f"📰 新闻收集: {news_data.get('total_news', 0)} 条新闻")
print(f" 成功源数: {news_data.get('successful_sources', 0)}/{news_data.get('total_sources', 0)}")
extraction_result_message += f"\n📰 新闻收集: {news_data.get('total_news', 0)} 条新闻\n"
extraction_result_message += f" 成功源数: {news_data.get('successful_sources', 0)}/{news_data.get('total_sources', 0)}\n"
# 话题提取结果
topic_data = extraction_result.get('topic_extraction', {})
keywords = topic_data.get('keywords', [])
summary = topic_data.get('summary', '')
print(f"\n🔑 提取关键词: {len(keywords)} 个")
extraction_result_message += f"\n🔑 提取关键词: {len(keywords)} 个\n"
if keywords:
# 每行显示5个关键词
for i in range(0, len(keywords), 5):
keyword_group = keywords[i:i+5]
print(f" {', '.join(keyword_group)}")
extraction_result_message += f" {', '.join(keyword_group)}\n"
print(f"\n📝 新闻总结:")
print(f" {summary}")
extraction_result_message += f"\n📝 新闻总结:\n {summary}\n"
# 数据库保存结果
db_data = extraction_result.get('database_save', {})
if db_data.get('success'):
print(f"\n💾 数据库保存: 成功")
extraction_result_message += f"\n💾 数据库保存: 成功\n"
else:
print(f"\n💾 数据库保存: 失败")
extraction_result_message += f"\n💾 数据库保存: 失败\n"
print("\n" + "=" * 80)
logger.info(extraction_result_message)
def get_keywords_for_crawling(self, extract_date: date = None) -> List[str]:
"""
... ... @@ -207,7 +200,7 @@ class BroadTopicExtraction:
topics_data = self.db_manager.get_daily_topics(extract_date)
if not topics_data:
print(f"没有找到 {extract_date or date.today()} 的话题数据")
logger.info(f"没有找到 {extract_date or date.today()} 的话题数据")
return []
keywords = topics_data['keywords']
... ... @@ -215,11 +208,11 @@ class BroadTopicExtraction:
# 生成搜索关键词
search_keywords = self.topic_extractor.get_search_keywords(keywords)
print(f"准备了 {len(search_keywords)} 个关键词用于爬取")
logger.info(f"准备了 {len(search_keywords)} 个关键词用于爬取")
return search_keywords
except Exception as e:
print(f"获取爬取关键词失败: {e}")
logger.error(f"获取爬取关键词失败: {e}")
return []
def get_daily_analysis(self, target_date: date = None) -> Optional[Dict]:
... ... @@ -227,7 +220,7 @@ class BroadTopicExtraction:
try:
return self.db_manager.get_daily_topics(target_date)
except Exception as e:
print(f"获取每日分析失败: {e}")
logger.error(f"获取每日分析失败: {e}")
return None
def get_recent_analysis(self, days: int = 7) -> List[Dict]:
... ... @@ -235,7 +228,7 @@ class BroadTopicExtraction:
try:
return self.db_manager.get_recent_topics(days)
except Exception as e:
print(f"获取最近分析失败: {e}")
logger.error(f"获取最近分析失败: {e}")
return []
# ==================== 命令行工具 ====================
... ... @@ -260,17 +253,17 @@ async def run_extraction_command(sources=None, keywords_count=100, show_details=
news_data = result.get('news_collection', {})
topic_data = result.get('topic_extraction', {})
print(f"✅ 话题提取成功完成!")
print(f" 收集新闻: {news_data.get('total_news', 0)} 条")
print(f" 提取关键词: {len(topic_data.get('keywords', []))} 个")
print(f" 生成总结: {len(topic_data.get('summary', ''))} 字符")
logger.info(f"✅ 话题提取成功完成!")
logger.info(f" 收集新闻: {news_data.get('total_news', 0)} 条")
logger.info(f" 提取关键词: {len(topic_data.get('keywords', []))} 个")
logger.info(f" 生成总结: {len(topic_data.get('summary', ''))} 字符")
# 获取爬取关键词
crawling_keywords = extractor.get_keywords_for_crawling()
if crawling_keywords:
print(f"\n🔑 为DeepSentimentCrawling准备的搜索关键词:")
print(f" {', '.join(crawling_keywords)}")
logger.info(f"\n🔑 为DeepSentimentCrawling准备的搜索关键词:")
logger.info(f" {', '.join(crawling_keywords)}")
# 保存关键词到文件
keywords_file = project_root / "data" / "daily_keywords.txt"
... ... @@ -279,16 +272,16 @@ async def run_extraction_command(sources=None, keywords_count=100, show_details=
with open(keywords_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(crawling_keywords))
print(f" 关键词已保存到: {keywords_file}")
logger.info(f" 关键词已保存到: {keywords_file}")
return True
else:
print(f"❌ 话题提取失败: {result.get('error', '未知错误')}")
logger.error(f"❌ 话题提取失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
print(f"❌ 执行过程中发生错误: {e}")
logger.error(f"❌ 执行过程中发生错误: {e}")
return False
def main():
... ... @@ -304,14 +297,14 @@ def main():
# 显示支持的新闻源
if args.list_sources:
print("支持的新闻源平台:")
logger.info("支持的新闻源平台:")
for source, name in SOURCE_NAMES.items():
print(f" {source:<25} {name}")
logger.info(f" {source:<25} {name}")
return
# 验证参数
if args.keywords < 1 or args.keywords > 200:
print("关键词数量应在1-200之间")
logger.error("关键词数量应在1-200之间")
sys.exit(1)
# 运行提取
... ... @@ -325,7 +318,7 @@ def main():
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n用户中断操作")
logger.info("用户中断操作")
sys.exit(1)
if __name__ == "__main__":
... ...
... ... @@ -18,19 +18,20 @@ sys.path.append(str(project_root))
try:
import config
from config import settings
except ImportError:
raise ImportError("无法导入config.py配置文件")
raise ImportError("无法导入settings.py配置文件")
class TopicExtractor:
"""话题提取器"""
def __init__(self):
"""初始化话题提取器"""
self.client = OpenAI(
api_key=config.DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
api_key=settings.MINDSPIDER_API_KEY,
base_url=settings.MINDSPIDER_BASE_URL
)
self.model = "deepseek-chat"
self.model = settings.MINDSPIDER_MODEL_NAME
def extract_keywords_and_summary(self, news_list: List[Dict], max_keywords: int = 100) -> Tuple[List[str], str]:
"""
... ...
... ... @@ -11,8 +11,8 @@ from datetime import date, timedelta, datetime
from pathlib import Path
from typing import List, Dict, Optional
import random
import pymysql
from pymysql.cursors import DictCursor
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -23,30 +23,38 @@ try:
except ImportError:
raise ImportError("无法导入config.py配置文件")
from config import settings
from loguru import logger
class KeywordManager:
"""关键词管理器"""
def __init__(self):
"""初始化关键词管理器"""
self.connection = None
self.engine: Engine = None
self.connect()
def connect(self):
"""连接数据库"""
try:
self.connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
autocommit=True,
cursorclass=DictCursor
)
print(f"关键词管理器成功连接到数据库: {config.DB_NAME}")
dialect = (settings.DB_DIALECT or "mysql").lower()
if dialect in ("postgresql", "postgres"):
url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
else:
url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}"
self.engine = create_engine(url, future=True)
logger.info(f"关键词管理器成功连接到数据库: {settings.DB_NAME}")
except ModuleNotFoundError as e:
missing: str = str(e)
if "psycopg" in missing:
logger.error("数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]")
elif "pymysql" in missing:
logger.error("数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql")
else:
logger.error(f"数据库连接失败(缺少驱动): {e}")
raise
except Exception as e:
print(f"关键词管理器数据库连接失败: {e}")
logger.exception(f"关键词管理器数据库连接失败: {e}")
raise
def get_latest_keywords(self, target_date: date = None, max_keywords: int = 100) -> List[str]:
... ... @@ -63,24 +71,24 @@ class KeywordManager:
if not target_date:
target_date = date.today()
print(f"正在获取 {target_date} 的关键词...")
logger.info(f"正在获取 {target_date} 的关键词...")
# 首先尝试获取指定日期的关键词
topics_data = self.get_daily_topics(target_date)
if topics_data and topics_data.get('keywords'):
keywords = topics_data['keywords']
print(f"成功获取 {target_date} 的 {len(keywords)} 个关键词")
logger.info(f"成功获取 {target_date} 的 {len(keywords)} 个关键词")
# 如果关键词太多,随机选择指定数量
if len(keywords) > max_keywords:
keywords = random.sample(keywords, max_keywords)
print(f"随机选择了 {max_keywords} 个关键词")
logger.info(f"随机选择了 {max_keywords} 个关键词")
return keywords
# 如果没有当天的关键词,尝试获取最近几天的
print(f"{target_date} 没有关键词数据,尝试获取最近的关键词...")
logger.info(f"{target_date} 没有关键词数据,尝试获取最近的关键词...")
recent_topics = self.get_recent_topics(days=7)
if recent_topics:
... ... @@ -95,11 +103,11 @@ class KeywordManager:
if len(unique_keywords) > max_keywords:
unique_keywords = random.sample(unique_keywords, max_keywords)
print(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词")
logger.info(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词")
return unique_keywords
# 如果都没有,返回默认关键词
print("没有找到任何关键词数据,使用默认关键词")
logger.info("没有找到任何关键词数据,使用默认关键词")
return self._get_default_keywords()
def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
... ... @@ -116,20 +124,22 @@ class KeywordManager:
extract_date = date.today()
try:
cursor = self.connection.cursor()
query = "SELECT * FROM daily_topics WHERE extract_date = %s"
cursor.execute(query, (extract_date,))
result = cursor.fetchone()
with self.engine.connect() as conn:
result = conn.execute(
text("SELECT * FROM daily_topics WHERE extract_date = :d"),
{"d": extract_date},
).mappings().first()
if result:
# 解析关键词JSON
result['keywords'] = json.loads(result['keywords'])
# 转为可变dict再赋值
result = dict(result)
result['keywords'] = json.loads(result['keywords']) if result.get('keywords') else []
return result
else:
return None
except Exception as e:
print(f"获取话题分析失败: {e}")
logger.exception(f"获取话题分析失败: {e}")
return None
def get_recent_topics(self, days: int = 7) -> List[Dict]:
... ... @@ -143,23 +153,28 @@ class KeywordManager:
话题分析列表
"""
try:
cursor = self.connection.cursor()
query = """
SELECT * FROM daily_topics
WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
ORDER BY extract_date DESC
"""
cursor.execute(query, (days,))
results = cursor.fetchall()
start_date = date.today() - timedelta(days=days)
with self.engine.connect() as conn:
results = conn.execute(
text(
"""
SELECT * FROM daily_topics
WHERE extract_date >= :start_date
ORDER BY extract_date DESC
"""
),
{"start_date": start_date},
).mappings().all()
# 解析每个结果的关键词JSON
# 转为可变dict列表再处理
results = [dict(r) for r in results]
for result in results:
result['keywords'] = json.loads(result['keywords'])
result['keywords'] = json.loads(result['keywords']) if result.get('keywords') else []
return results
except Exception as e:
print(f"获取最近话题分析失败: {e}")
logger.exception(f"获取最近话题分析失败: {e}")
return []
def _get_default_keywords(self) -> List[str]:
... ... @@ -190,8 +205,8 @@ class KeywordManager:
keywords = self.get_latest_keywords(target_date, max_keywords)
if keywords:
print(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词")
print(f"每个关键词将在所有平台上进行爬取")
logger.info(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词")
logger.info(f"每个关键词将在所有平台上进行爬取")
return keywords
... ... @@ -210,7 +225,7 @@ class KeywordManager:
"""
keywords = self.get_latest_keywords(target_date, max_keywords)
print(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)")
logger.info(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)")
return keywords
def _filter_keywords_by_platform(self, keywords: List[str], platform: str) -> List[str]:
... ... @@ -290,9 +305,9 @@ class KeywordManager:
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
print("关键词管理器数据库连接已关闭")
if self.engine:
self.engine.dispose()
logger.info("关键词管理器数据库连接已关闭")
def __enter__(self):
return self
... ... @@ -305,16 +320,16 @@ if __name__ == "__main__":
with KeywordManager() as km:
# 测试获取关键词
keywords = km.get_latest_keywords(max_keywords=20)
print(f"获取到的关键词: {keywords}")
logger.info(f"获取到的关键词: {keywords}")
# 测试平台分配
platforms = ['xhs', 'dy', 'bili']
distribution = km.distribute_keywords_by_platform(keywords, platforms)
for platform, kws in distribution.items():
print(f"{platform}: {kws}")
logger.info(f"{platform}: {kws}")
# 测试爬取摘要
summary = km.get_crawling_summary()
print(f"爬取摘要: {summary}")
logger.info(f"爬取摘要: {summary}")
print("关键词管理器测试完成!")
logger.info("关键词管理器测试完成!")
... ...
... ... @@ -13,6 +13,7 @@ from datetime import datetime
from pathlib import Path
from typing import List, Dict, Optional
import json
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -36,11 +37,15 @@ class PlatformCrawler:
if not self.mediacrawler_path.exists():
raise FileNotFoundError(f"MediaCrawler目录不存在: {self.mediacrawler_path}")
print(f"初始化平台爬虫管理器,MediaCrawler路径: {self.mediacrawler_path}")
logger.info(f"初始化平台爬虫管理器,MediaCrawler路径: {self.mediacrawler_path}")
def configure_mediacrawler_db(self):
"""配置MediaCrawler使用我们的MySQL数据库"""
"""配置MediaCrawler使用我们的数据库(MySQL或PostgreSQL)"""
try:
# 判断数据库类型
db_dialect = (config.settings.DB_DIALECT or "mysql").lower()
is_postgresql = db_dialect in ("postgresql", "postgres")
# 修改MediaCrawler的数据库配置
db_config_path = self.mediacrawler_path / "config" / "db_config.py"
... ... @@ -48,7 +53,14 @@ class PlatformCrawler:
with open(db_config_path, 'r', encoding='utf-8') as f:
content = f.read()
# 替换数据库配置
# PostgreSQL配置值:如果使用PostgreSQL则使用MindSpider配置,否则使用默认值或环境变量
pg_password = config.settings.DB_PASSWORD if is_postgresql else "bettafish"
pg_user = config.settings.DB_USER if is_postgresql else "bettafish"
pg_host = config.settings.DB_HOST if is_postgresql else "127.0.0.1"
pg_port = config.settings.DB_PORT if is_postgresql else 5432
pg_db_name = config.settings.DB_NAME if is_postgresql else "bettafish"
# 替换数据库配置 - 使用MindSpider的数据库配置
new_config = f'''# 声明:本代码仅供学习和研究目的使用。使用者应遵守以下原则:
# 1. 不得用于任何商业用途。
# 2. 使用时应遵守目标平台的使用条款和robots.txt规则。
... ... @@ -63,11 +75,19 @@ class PlatformCrawler:
import os
# mysql config - 使用MindSpider的数据库配置
MYSQL_DB_PWD = "{config.DB_PASSWORD}"
MYSQL_DB_USER = "{config.DB_USER}"
MYSQL_DB_HOST = "{config.DB_HOST}"
MYSQL_DB_PORT = {config.DB_PORT}
MYSQL_DB_NAME = "{config.DB_NAME}"
MYSQL_DB_PWD = "{config.settings.DB_PASSWORD}"
MYSQL_DB_USER = "{config.settings.DB_USER}"
MYSQL_DB_HOST = "{config.settings.DB_HOST}"
MYSQL_DB_PORT = {config.settings.DB_PORT}
MYSQL_DB_NAME = "{config.settings.DB_NAME}"
mysql_db_config = {{
"user": MYSQL_DB_USER,
"password": MYSQL_DB_PWD,
"host": MYSQL_DB_HOST,
"port": MYSQL_DB_PORT,
"db_name": MYSQL_DB_NAME,
}}
# redis config
... ... @@ -81,17 +101,39 @@ CACHE_TYPE_REDIS = "redis"
CACHE_TYPE_MEMORY = "memory"
# sqlite config
SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schema", "sqlite_tables.db")'''
SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "database", "sqlite_tables.db")
sqlite_db_config = {{
"db_path": SQLITE_DB_PATH
}}
# postgresql config - 使用MindSpider的数据库配置(如果DB_DIALECT是postgresql)或环境变量
POSTGRESQL_DB_PWD = os.getenv("POSTGRESQL_DB_PWD", "{pg_password}")
POSTGRESQL_DB_USER = os.getenv("POSTGRESQL_DB_USER", "{pg_user}")
POSTGRESQL_DB_HOST = os.getenv("POSTGRESQL_DB_HOST", "{pg_host}")
POSTGRESQL_DB_PORT = os.getenv("POSTGRESQL_DB_PORT", "{pg_port}")
POSTGRESQL_DB_NAME = os.getenv("POSTGRESQL_DB_NAME", "{pg_db_name}")
postgresql_db_config = {{
"user": POSTGRESQL_DB_USER,
"password": POSTGRESQL_DB_PWD,
"host": POSTGRESQL_DB_HOST,
"port": POSTGRESQL_DB_PORT,
"db_name": POSTGRESQL_DB_NAME,
}}
'''
# 写入新配置
with open(db_config_path, 'w', encoding='utf-8') as f:
f.write(new_config)
print("已配置MediaCrawler使用MindSpider数据库")
db_type = "PostgreSQL" if is_postgresql else "MySQL"
logger.info(f"已配置MediaCrawler使用MindSpider {db_type}数据库")
return True
except Exception as e:
print(f"配置MediaCrawler数据库失败: {e}")
logger.exception(f"配置MediaCrawler数据库失败: {e}")
return False
def create_base_config(self, platform: str, keywords: List[str],
... ... @@ -109,6 +151,11 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
是否配置成功
"""
try:
# 判断数据库类型,确定 SAVE_DATA_OPTION
db_dialect = (config.settings.DB_DIALECT or "mysql").lower()
is_postgresql = db_dialect in ("postgresql", "postgres")
save_data_option = "postgresql" if is_postgresql else "db"
base_config_path = self.mediacrawler_path / "config" / "base_config.py"
# 将关键词列表转换为逗号分隔的字符串
... ... @@ -130,7 +177,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
elif line.startswith('CRAWLER_TYPE = '):
new_lines.append(f'CRAWLER_TYPE = "{crawler_type}" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)')
elif line.startswith('SAVE_DATA_OPTION = '):
new_lines.append('SAVE_DATA_OPTION = "db" # csv or db or json or sqlite')
new_lines.append(f'SAVE_DATA_OPTION = "{save_data_option}" # csv or db or json or sqlite or postgresql')
elif line.startswith('CRAWLER_MAX_NOTES_COUNT = '):
new_lines.append(f'CRAWLER_MAX_NOTES_COUNT = {max_notes}')
elif line.startswith('ENABLE_GET_COMMENTS = '):
... ... @@ -146,11 +193,11 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
with open(base_config_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(new_lines))
print(f"已配置 {platform} 平台,关键词数量: {len(keywords)}")
logger.info(f"已配置 {platform} 平台,爬取类型: {crawler_type},关键词数量: {len(keywords)},最大爬取数量: {max_notes},保存数据方式: {save_data_option}")
return True
except Exception as e:
print(f"创建基础配置失败: {e}")
logger.exception(f"创建基础配置失败: {e}")
return False
def run_crawler(self, platform: str, keywords: List[str],
... ... @@ -173,8 +220,9 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
if not keywords:
raise ValueError("关键词列表不能为空")
print(f"\n开始爬取平台: {platform}")
print(f"关键词: {keywords[:5]}{'...' if len(keywords) > 5 else ''} (共{len(keywords)}个)")
start_message = f"\n开始爬取平台: {platform}"
start_message += f"\n关键词: {keywords[:5]}{'...' if len(keywords) > 5 else ''} (共{len(keywords)}个)"
logger.info(start_message)
start_time = datetime.now()
... ... @@ -187,22 +235,27 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
if not self.create_base_config(platform, keywords, "search", max_notes):
return {"success": False, "error": "基础配置创建失败"}
# 判断数据库类型,确定 save_data_option
db_dialect = (config.settings.DB_DIALECT or "mysql").lower()
is_postgresql = db_dialect in ("postgresql", "postgres")
save_data_option = "postgresql" if is_postgresql else "db"
# 构建命令
cmd = [
sys.executable, "main.py",
"--platform", platform,
"--lt", login_type,
"--type", "search",
"--save_data_option", "db"
"--save_data_option", save_data_option
]
print(f"执行命令: {' '.join(cmd)}")
logger.info(f"执行命令: {' '.join(cmd)}")
# 切换到MediaCrawler目录并执行
result = subprocess.run(
cmd,
cwd=self.mediacrawler_path,
timeout=1800 # 30分钟超时
timeout=3600 # 60分钟超时
)
end_time = datetime.now()
... ... @@ -226,17 +279,17 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
self.crawl_stats[platform] = crawl_stats
if result.returncode == 0:
print(f"✅ {platform} 爬取完成,耗时: {duration:.1f}秒")
logger.info(f"✅ {platform} 爬取完成,耗时: {duration:.1f}秒")
else:
print(f"❌ {platform} 爬取失败,返回码: {result.returncode}")
logger.error(f"❌ {platform} 爬取失败,返回码: {result.returncode}")
return crawl_stats
except subprocess.TimeoutExpired:
print(f"❌ {platform} 爬取超时")
logger.exception(f"❌ {platform} 爬取超时")
return {"success": False, "error": "爬取超时", "platform": platform}
except Exception as e:
print(f"❌ {platform} 爬取异常: {e}")
logger.exception(f"❌ {platform} 爬取异常: {e}")
return {"success": False, "error": str(e), "platform": platform}
def _parse_crawl_output(self, output_lines: List[str], error_lines: List[str]) -> Dict:
... ... @@ -291,10 +344,14 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
Returns:
总体爬取统计
"""
print(f"\n🚀 开始全平台关键词爬取")
print(f" 关键词数量: {len(keywords)}")
print(f" 平台数量: {len(platforms)}")
print(f" 总爬取任务: {len(keywords)} × {len(platforms)} = {len(keywords) * len(platforms)}")
start_message = f"\n🚀 开始全平台关键词爬取"
start_message += f"\n 关键词数量: {len(keywords)}"
start_message += f"\n 平台数量: {len(platforms)}"
start_message += f"\n 登录方式: {login_type}"
start_message += f"\n 每个关键词在每个平台的最大爬取数量: {max_notes_per_keyword}"
start_message += f"\n 总爬取任务: {len(keywords)} × {len(platforms)} = {len(keywords) * len(platforms)}"
logger.info(start_message)
total_stats = {
"total_keywords": len(keywords),
... ... @@ -319,8 +376,8 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
# 对每个平台一次性爬取所有关键词
for platform in platforms:
print(f"\n📝 在 {platform} 平台爬取所有关键词")
print(f" 关键词: {', '.join(keywords[:5])}{'...' if len(keywords) > 5 else ''}")
logger.info(f"\n📝 在 {platform} 平台爬取所有关键词")
logger.info(f" 关键词: {', '.join(keywords[:5])}{'...' if len(keywords) > 5 else ''}")
try:
# 一次性传递所有关键词给平台
... ... @@ -344,7 +401,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
total_stats["keyword_results"][keyword] = {}
total_stats["keyword_results"][keyword][platform] = result
print(f" ✅ 成功: {notes_count} 条内容, {comments_count} 条评论")
logger.info(f" ✅ 成功: {notes_count} 条内容, {comments_count} 条评论")
else:
total_stats["failed_tasks"] += len(keywords)
total_stats["platform_summary"][platform]["failed_keywords"] = len(keywords)
... ... @@ -355,7 +412,7 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
total_stats["keyword_results"][keyword] = {}
total_stats["keyword_results"][keyword][platform] = result
print(f" ❌ 失败: {result.get('error', '未知错误')}")
logger.error(f" ❌ 失败: {result.get('error', '未知错误')}")
except Exception as e:
total_stats["failed_tasks"] += len(keywords)
... ... @@ -368,22 +425,24 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
total_stats["keyword_results"][keyword] = {}
total_stats["keyword_results"][keyword][platform] = error_result
print(f" ❌ 异常: {e}")
logger.error(f" ❌ 异常: {e}")
# 打印详细统计
print(f"\n📊 全平台关键词爬取完成!")
print(f" 总任务: {total_stats['total_tasks']}")
print(f" 成功: {total_stats['successful_tasks']}")
print(f" 失败: {total_stats['failed_tasks']}")
print(f" 成功率: {total_stats['successful_tasks']/total_stats['total_tasks']*100:.1f}%")
print(f" 总内容: {total_stats['total_notes']} 条")
print(f" 总评论: {total_stats['total_comments']} 条")
finish_message = f"\n📊 全平台关键词爬取完成!"
finish_message += f"\n 总任务: {total_stats['total_tasks']}"
finish_message += f"\n 成功: {total_stats['successful_tasks']}"
finish_message += f"\n 失败: {total_stats['failed_tasks']}"
finish_message += f"\n 成功率: {total_stats['successful_tasks']/total_stats['total_tasks']*100:.1f}%"
finish_message += f"\n 总内容: {total_stats['total_notes']} 条"
finish_message += f"\n 总评论: {total_stats['total_comments']} 条"
logger.info(finish_message)
print(f"\n📈 各平台统计:")
platform_summary_message = f"\n� 各平台统计:"
for platform, stats in total_stats["platform_summary"].items():
success_rate = stats["successful_keywords"] / len(keywords) * 100 if keywords else 0
print(f" {platform}: {stats['successful_keywords']}/{len(keywords)} 关键词成功 ({success_rate:.1f}%), "
f"{stats['total_notes']} 条内容")
platform_summary_message += f"\n {platform}: {stats['successful_keywords']}/{len(keywords)} 关键词成功 ({success_rate:.1f}%), "
platform_summary_message += f"{stats['total_notes']} 条内容"
logger.info(platform_summary_message)
return total_stats
... ... @@ -403,9 +462,9 @@ SQLITE_DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "schem
try:
with open(log_path, 'w', encoding='utf-8') as f:
json.dump(self.crawl_stats, f, ensure_ascii=False, indent=2)
print(f"爬取日志已保存到: {log_path}")
logger.info(f"爬取日志已保存到: {log_path}")
except Exception as e:
print(f"保存爬取日志失败: {e}")
logger.exception(f"保存爬取日志失败: {e}")
if __name__ == "__main__":
# 测试平台爬虫管理器
... ... @@ -415,5 +474,5 @@ if __name__ == "__main__":
test_keywords = ["科技", "AI", "编程"]
result = crawler.run_crawler("xhs", test_keywords, max_notes=5)
print(f"测试结果: {result}")
print("平台爬虫管理器测试完成!")
logger.info(f"测试结果: {result}")
logger.info("平台爬虫管理器测试完成!")
... ...
... ... @@ -217,26 +217,54 @@ git clone https://github.com/yourusername/MindSpider.git
cd MindSpider
```
### 2. 创建并激活Conda环境
### 2. 创建并激活环境
#### Conda配置方法
#### Conda配置方法
```bash
# 创建名为 pytorch_python11 的conda环境并指定Python版本
conda create -n pytorch_python11 python=3.11
# 激活该环境
conda activate pytorch_python11
```
#### UV配置方法
> [UV 是一种快速轻量级 Python 包环境管理工具,适用于低依赖及便捷管理需求。可参考:https://github.com/astral-sh/uv]
- 安装uv(如未安装)
```bash
pip install uv
```
- 创建虚拟环境并激活
```bash
uv venv --python 3.11 # 创建3.11环境
source .venv/bin/activate # Linux/macOS
# 或
.venv\Scripts\activate # Windows
```
### 3. 安装依赖
```bash
# 安装Python依赖
pip install -r requirements.txt
# uv版本更加快速
uv pip install -r requirements.txt
# 安装Playwright浏览器驱动
playwright install
```
### 4. 配置系统
编辑 `config.py` 文件,设置数据库和API配置:
复制.env.example文件为.env文件,放置在项目根目录。编辑 `.env` 文件,设置数据库和API配置:
```python
# MySQL数据库配置
... ... @@ -248,7 +276,9 @@ DB_NAME = "mindspider"
DB_CHARSET = "utf8mb4"
# DeepSeek API密钥
DEEPSEEK_API_KEY = "your_deepseek_api_key"
MINDSPIDER_BASE_URL=your_api_base_url
MINDSPIDER_API_KEY=sk-your-key
MINDSPIDER_MODEL_NAME=deepseek-chat
```
### 5. 初始化系统
... ... @@ -418,6 +448,11 @@ python main.py --status
```bash
# 重新安装
pip install playwright
uv pip install playwright
playwright install
```
... ...
... ... @@ -3,13 +3,33 @@
存储数据库连接信息和API密钥
"""
# MySQL数据库配置
DB_HOST = "your_host"
DB_PORT = 3306
DB_USER = "your_username"
DB_PASSWORD = "your_password"
DB_NAME = "mindspider"
DB_CHARSET = "utf8mb4"
# DeepSeek API密钥
DEEPSEEK_API_KEY = "your_deepseek_api_key"
from pydantic_settings import BaseSettings
from typing import Optional
from pydantic import Field
from pathlib import Path
# 计算 .env 优先级:优先当前工作目录,其次项目根目录(MindSpider 的上级目录)
PROJECT_ROOT: Path = Path(__file__).resolve().parents[1]
CWD_ENV: Path = Path.cwd() / ".env"
ENV_FILE: str = str(CWD_ENV if CWD_ENV.exists() else (PROJECT_ROOT / ".env"))
class Settings(BaseSettings):
"""全局配置管理,优先从环境变量和.env加载。支持MySQL/PostgreSQL统一数据库参数命名。"""
DB_DIALECT: str = Field("mysql", description="数据库类型,支持'mysql'或'postgresql'")
DB_HOST: str = Field("your_host", description="数据库主机名或IP地址")
DB_PORT: int = Field(3306, description="数据库端口号")
DB_USER: str = Field("your_username", description="数据库用户名")
DB_PASSWORD: str = Field("your_password", description="数据库密码")
DB_NAME: str = Field("mindspider", description="数据库名称")
DB_CHARSET: str = Field("utf8mb4", description="数据库字符集")
MINDSPIDER_API_KEY: Optional[str] = Field(None, description="MINDSPIDER API密钥")
MINDSPIDER_BASE_URL: Optional[str] = Field("https://api.deepseek.com", description="MINDSPIDER API基础URL,推荐deepseek-chat模型使用https://api.deepseek.com")
MINDSPIDER_MODEL_NAME: Optional[str] = Field("deepseek-chat", description="MINDSPIDER API模型名称, 推荐deepseek-chat")
class Config:
env_file = ENV_FILE
env_prefix = ""
case_sensitive = False
extra = "allow"
settings = Settings()
... ...
... ... @@ -11,8 +11,13 @@ import argparse
from datetime import date, datetime
from pathlib import Path
import subprocess
import asyncio
import pymysql
from pymysql.cursors import DictCursor
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from sqlalchemy import inspect, text
from config import settings
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent
... ... @@ -21,8 +26,8 @@ sys.path.append(str(project_root))
try:
import config
except ImportError:
print("错误:无法导入config.py配置文件")
print("请确保项目根目录下存在config.py文件,并包含数据库和API配置信息")
logger.error("错误:无法导入config.py配置文件")
logger.error("请确保项目根目录下存在config.py文件,并包含数据库和API配置信息")
sys.exit(1)
class MindSpider:
... ... @@ -35,99 +40,110 @@ class MindSpider:
self.deep_sentiment_path = self.project_root / "DeepSentimentCrawling"
self.schema_path = self.project_root / "schema"
print("MindSpider AI爬虫项目")
print(f"项目路径: {self.project_root}")
logger.info("MindSpider AI爬虫项目")
logger.info(f"项目路径: {self.project_root}")
def check_config(self) -> bool:
"""检查基础配置"""
print("\n检查基础配置...")
logger.info("检查基础配置...")
# 检查config.py配置项
# 检查settings配置项
required_configs = [
'DB_HOST', 'DB_PORT', 'DB_USER', 'DB_PASSWORD', 'DB_NAME', 'DB_CHARSET',
'DEEPSEEK_API_KEY'
'MINDSPIDER_API_KEY', 'MINDSPIDER_BASE_URL', 'MINDSPIDER_MODEL_NAME'
]
missing_configs = []
for config_name in required_configs:
if not hasattr(config, config_name) or not getattr(config, config_name):
if not hasattr(settings, config_name) or not getattr(settings, config_name):
missing_configs.append(config_name)
if missing_configs:
print(f"配置缺失: {', '.join(missing_configs)}")
print("请检查config.py文件中的配置信息")
logger.error(f"配置缺失: {', '.join(missing_configs)}")
logger.error("请检查config.py文件中的配置信息")
return False
print("基础配置检查通过")
logger.info("基础配置检查通过")
return True
def check_database_connection(self) -> bool:
"""检查数据库连接"""
print("\n检查数据库连接...")
try:
connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
cursorclass=DictCursor
logger.info("检查数据库连接...")
def build_async_url() -> str:
dialect = (settings.DB_DIALECT or "mysql").lower()
if dialect == "postgresql":
return f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
# 默认使用 mysql 异步驱动 asyncmy
return (
f"mysql+asyncmy://{settings.DB_USER}:{settings.DB_PASSWORD}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}"
)
connection.close()
print("数据库连接正常")
async def _test_connection(db_url: str) -> None:
engine: AsyncEngine = create_async_engine(db_url, pool_pre_ping=True)
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
finally:
await engine.dispose()
try:
db_url: str = build_async_url()
asyncio.run(_test_connection(db_url))
logger.info("数据库连接正常")
return True
except Exception as e:
print(f"数据库连接失败: {e}")
logger.exception(f"数据库连接失败: {e}")
return False
def check_database_tables(self) -> bool:
"""检查数据库表是否存在"""
print("\n检查数据库表...")
try:
connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
cursorclass=DictCursor
logger.info("检查数据库表...")
def build_async_url() -> str:
dialect = (settings.DB_DIALECT or "mysql").lower()
if dialect == "postgresql":
return f"postgresql+asyncpg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
return (
f"mysql+asyncmy://{settings.DB_USER}:{settings.DB_PASSWORD}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}"
)
cursor = connection.cursor()
# 检查核心表是否存在
async def _check_tables(db_url: str) -> list[str]:
engine: AsyncEngine = create_async_engine(db_url, pool_pre_ping=True)
try:
async with engine.connect() as conn:
def _get_tables(sync_conn):
return inspect(sync_conn).get_table_names()
tables = await conn.run_sync(_get_tables)
return tables
finally:
await engine.dispose()
try:
db_url: str = build_async_url()
existing_tables = asyncio.run(_check_tables(db_url))
required_tables = ['daily_news', 'daily_topics']
cursor.execute("SHOW TABLES")
existing_tables = [row[f'Tables_in_{config.DB_NAME}'] for row in cursor.fetchall()]
missing_tables = [table for table in required_tables if table not in existing_tables]
connection.close()
missing_tables = [t for t in required_tables if t not in existing_tables]
if missing_tables:
print(f"缺少数据库表: {', '.join(missing_tables)}")
logger.error(f"缺少数据库表: {', '.join(missing_tables)}")
return False
else:
print("数据库表检查通过")
return True
logger.info("数据库表检查通过")
return True
except Exception as e:
print(f"检查数据库表失败: {e}")
logger.exception(f"检查数据库表失败: {e}")
return False
def initialize_database(self) -> bool:
"""初始化数据库"""
print("\n初始化数据库...")
logger.info("初始化数据库...")
try:
# 运行数据库初始化脚本
init_script = self.schema_path / "init_database.py"
if not init_script.exists():
print("错误:找不到数据库初始化脚本")
logger.error("错误:找不到数据库初始化脚本")
return False
result = subprocess.run(
... ... @@ -138,19 +154,19 @@ class MindSpider:
)
if result.returncode == 0:
print("数据库初始化成功")
logger.info("数据库初始化成功")
return True
else:
print(f"数据库初始化失败: {result.stderr}")
logger.error(f"数据库初始化失败: {result.stderr}")
return False
except Exception as e:
print(f"数据库初始化异常: {e}")
logger.exception(f"数据库初始化异常: {e}")
return False
def check_dependencies(self) -> bool:
"""检查依赖环境"""
print("\n检查依赖环境...")
logger.info("检查依赖环境...")
# 检查Python包
required_packages = ['pymysql', 'requests', 'playwright']
... ... @@ -163,22 +179,22 @@ class MindSpider:
missing_packages.append(package)
if missing_packages:
print(f"缺少Python包: {', '.join(missing_packages)}")
print("请运行: pip install -r requirements.txt")
logger.error(f"缺少Python包: {', '.join(missing_packages)}")
logger.info("请运行: pip install -r requirements.txt")
return False
# 检查MediaCrawler依赖
mediacrawler_path = self.deep_sentiment_path / "MediaCrawler"
if not mediacrawler_path.exists():
print("错误:找不到MediaCrawler目录")
logger.error("错误:找不到MediaCrawler目录")
return False
print("依赖环境检查通过")
logger.info("依赖环境检查通过")
return True
def run_broad_topic_extraction(self, extract_date: date = None, keywords_count: int = 100) -> bool:
"""运行BroadTopicExtraction模块"""
print(f"\n运行BroadTopicExtraction模块...")
logger.info("运行BroadTopicExtraction模块...")
if not extract_date:
extract_date = date.today()
... ... @@ -186,11 +202,10 @@ class MindSpider:
try:
cmd = [
sys.executable, "main.py",
"--date", extract_date.strftime("%Y-%m-%d"),
"--keywords", str(keywords_count)
]
print(f"执行命令: {' '.join(cmd)}")
logger.info(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
... ... @@ -199,24 +214,24 @@ class MindSpider:
)
if result.returncode == 0:
print("BroadTopicExtraction模块执行成功")
logger.info("BroadTopicExtraction模块执行成功")
return True
else:
print(f"BroadTopicExtraction模块执行失败,返回码: {result.returncode}")
logger.error(f"BroadTopicExtraction模块执行失败,返回码: {result.returncode}")
return False
except subprocess.TimeoutExpired:
print("BroadTopicExtraction模块执行超时")
logger.error("BroadTopicExtraction模块执行超时")
return False
except Exception as e:
print(f"BroadTopicExtraction模块执行异常: {e}")
logger.exception(f"BroadTopicExtraction模块执行异常: {e}")
return False
def run_deep_sentiment_crawling(self, target_date: date = None, platforms: list = None,
max_keywords: int = 50, max_notes: int = 50,
test_mode: bool = False) -> bool:
"""运行DeepSentimentCrawling模块"""
print(f"\n运行DeepSentimentCrawling模块...")
logger.info("运行DeepSentimentCrawling模块...")
if not target_date:
target_date = date.today()
... ... @@ -238,7 +253,7 @@ class MindSpider:
if test_mode:
cmd.append("--test")
print(f"执行命令: {' '.join(cmd)}")
logger.info(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
... ... @@ -247,78 +262,78 @@ class MindSpider:
)
if result.returncode == 0:
print("DeepSentimentCrawling模块执行成功")
logger.info("DeepSentimentCrawling模块执行成功")
return True
else:
print(f"DeepSentimentCrawling模块执行失败,返回码: {result.returncode}")
logger.error(f"DeepSentimentCrawling模块执行失败,返回码: {result.returncode}")
return False
except subprocess.TimeoutExpired:
print("DeepSentimentCrawling模块执行超时")
logger.error("DeepSentimentCrawling模块执行超时")
return False
except Exception as e:
print(f"DeepSentimentCrawling模块执行异常: {e}")
logger.exception(f"DeepSentimentCrawling模块执行异常: {e}")
return False
def run_complete_workflow(self, target_date: date = None, platforms: list = None,
keywords_count: int = 100, max_keywords: int = 50,
max_notes: int = 50, test_mode: bool = False) -> bool:
"""运行完整工作流程"""
print(f"\n开始完整的MindSpider工作流程")
logger.info("开始完整的MindSpider工作流程")
if not target_date:
target_date = date.today()
print(f"目标日期: {target_date}")
print(f"平台列表: {platforms if platforms else '所有支持的平台'}")
print(f"测试模式: {'是' if test_mode else '否'}")
logger.info(f"目标日期: {target_date}")
logger.info(f"平台列表: {platforms if platforms else '所有支持的平台'}")
logger.info(f"测试模式: {'是' if test_mode else '否'}")
# 第一步:运行话题提取
print(f"\n=== 第一步:话题提取 ===")
logger.info("=== 第一步:话题提取 ===")
if not self.run_broad_topic_extraction(target_date, keywords_count):
print("话题提取失败,终止流程")
logger.error("话题提取失败,终止流程")
return False
# 第二步:运行情感爬取
print(f"\n=== 第二步:情感爬取 ===")
logger.info("=== 第二步:情感爬取 ===")
if not self.run_deep_sentiment_crawling(target_date, platforms, max_keywords, max_notes, test_mode):
print("情感爬取失败,但话题提取已完成")
logger.error("情感爬取失败,但话题提取已完成")
return False
print(f"\n完整工作流程执行成功!")
logger.info("完整工作流程执行成功!")
return True
def show_status(self):
"""显示项目状态"""
print(f"\nMindSpider项目状态:")
print(f"项目路径: {self.project_root}")
logger.info("MindSpider项目状态:")
logger.info(f"项目路径: {self.project_root}")
# 配置状态
config_ok = self.check_config()
print(f"配置状态: {'正常' if config_ok else '异常'}")
logger.info(f"配置状态: {'正常' if config_ok else '异常'}")
# 数据库状态
if config_ok:
db_conn_ok = self.check_database_connection()
print(f"数据库连接: {'正常' if db_conn_ok else '异常'}")
logger.info(f"数据库连接: {'正常' if db_conn_ok else '异常'}")
if db_conn_ok:
db_tables_ok = self.check_database_tables()
print(f"数据库表: {'正常' if db_tables_ok else '需要初始化'}")
logger.info(f"数据库表: {'正常' if db_tables_ok else '需要初始化'}")
# 依赖状态
deps_ok = self.check_dependencies()
print(f"依赖环境: {'正常' if deps_ok else '异常'}")
logger.info(f"依赖环境: {'正常' if deps_ok else '异常'}")
# 模块状态
broad_topic_exists = self.broad_topic_path.exists()
deep_sentiment_exists = self.deep_sentiment_path.exists()
print(f"BroadTopicExtraction模块: {'存在' if broad_topic_exists else '缺失'}")
print(f"DeepSentimentCrawling模块: {'存在' if deep_sentiment_exists else '缺失'}")
logger.info(f"BroadTopicExtraction模块: {'存在' if broad_topic_exists else '缺失'}")
logger.info(f"DeepSentimentCrawling模块: {'存在' if deep_sentiment_exists else '缺失'}")
def setup_project(self) -> bool:
"""项目初始化设置"""
print(f"\n开始MindSpider项目初始化...")
logger.info("开始MindSpider项目初始化...")
# 1. 检查配置
if not self.check_config():
... ... @@ -334,11 +349,11 @@ class MindSpider:
# 4. 检查并初始化数据库表
if not self.check_database_tables():
print("需要初始化数据库表...")
logger.info("需要初始化数据库表...")
if not self.initialize_database():
return False
print(f"\nMindSpider项目初始化完成!")
logger.info("MindSpider项目初始化完成!")
return True
def main():
... ... @@ -373,7 +388,7 @@ def main():
try:
target_date = datetime.strptime(args.date, "%Y-%m-%d").date()
except ValueError:
print("错误:日期格式不正确,请使用 YYYY-MM-DD 格式")
logger.error("错误:日期格式不正确,请使用 YYYY-MM-DD 格式")
return
# 创建MindSpider实例
... ... @@ -388,17 +403,17 @@ def main():
# 项目设置
if args.setup:
if spider.setup_project():
print("项目设置完成,可以开始使用MindSpider!")
logger.info("项目设置完成,可以开始使用MindSpider!")
else:
print("项目设置失败,请检查配置和环境")
logger.error("项目设置失败,请检查配置和环境")
return
# 初始化数据库
if args.init_db:
if spider.initialize_database():
print("数据库初始化成功")
logger.info("数据库初始化成功")
else:
print("数据库初始化失败")
logger.error("数据库初始化失败")
return
# 运行模块
... ... @@ -415,16 +430,16 @@ def main():
)
else:
# 默认运行完整工作流程
print("运行完整MindSpider工作流程...")
logger.info("运行完整MindSpider工作流程...")
spider.run_complete_workflow(
target_date, args.platforms, args.keywords_count,
args.max_keywords, args.max_notes, args.test
)
except KeyboardInterrupt:
print("\n用户中断操作")
logger.info("用户中断操作")
except Exception as e:
print(f"\n执行出错: {e}")
logger.exception(f"执行出错: {e}")
if __name__ == "__main__":
main()
... ...
... ... @@ -7,6 +7,8 @@
pymysql==1.1.0
aiomysql==0.2.0
aiosqlite==0.21.0
asyncpg
sqlalchemy
# ===============================
# HTTP请求和网络
... ... @@ -42,6 +44,8 @@ wordcloud==1.9.3
matplotlib==3.9.0
parsel==1.9.1
pyexecjs==1.5.1
typer>=0.12.3
pyhumps==3.8.0
# ===============================
# 工具包
... ...
... ... @@ -7,10 +7,12 @@ MindSpider AI爬虫项目 - 数据库管理工具
import os
import sys
import pymysql
from sqlalchemy import create_engine, text, inspect
from sqlalchemy.engine import Engine
import argparse
from pathlib import Path
from datetime import datetime, timedelta
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -19,125 +21,132 @@ sys.path.append(str(project_root))
try:
import config
except ImportError:
print("错误: 无法导入config.py配置文件")
logger.error("错误: 无法导入config.py配置文件")
sys.exit(1)
from MindSpider.config import settings
class DatabaseManager:
def __init__(self):
self.connection = None
self.engine: Engine = None
self.connect()
def connect(self):
"""连接数据库"""
try:
self.connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
database=config.DB_NAME,
charset=config.DB_CHARSET,
autocommit=True
)
print(f"成功连接到数据库: {config.DB_NAME}")
dialect = (settings.DB_DIALECT or "mysql").lower()
if dialect in ("postgresql", "postgres"):
url = f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
else:
url = f"mysql+pymysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}?charset={settings.DB_CHARSET}"
self.engine = create_engine(url, future=True)
logger.info(f"成功连接到数据库: {settings.DB_NAME}")
except Exception as e:
print(f"数据库连接失败: {e}")
logger.error(f"数据库连接失败: {e}")
sys.exit(1)
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
if self.engine:
self.engine.dispose()
def show_tables(self):
"""显示所有表"""
print("\n" + "=" * 60)
print("数据库表列表")
print("=" * 60)
data_list_message = ""
data_list_message += "\n" + "=" * 60
data_list_message += "数据库表列表"
data_list_message += "=" * 60
logger.info(data_list_message)
cursor = self.connection.cursor()
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
inspector = inspect(self.engine)
tables = inspector.get_table_names()
if not tables:
print("数据库中没有表")
logger.info("数据库中没有表")
return
# 分类显示表
mindspider_tables = []
mediacrawler_tables = []
for table in tables:
table_name = table[0]
for table_name in tables:
if table_name in ['daily_news', 'daily_topics', 'topic_news_relation', 'crawling_tasks']:
mindspider_tables.append(table_name)
else:
mediacrawler_tables.append(table_name)
print("MindSpider核心表:")
data_list_message += "MindSpider核心表:"
data_list_message += "\n"
for table in mindspider_tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" - {table:<25} ({count:>6} 条记录)")
with self.engine.connect() as conn:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one()
data_list_message += f" - {table:<25} ({count:>6} 条记录)"
data_list_message += "\n"
print("\nMediaCrawler平台表:")
data_list_message += "\nMediaCrawler平台表:"
data_list_message += "\n"
for table in mediacrawler_tables:
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" - {table:<25} ({count:>6} 条记录)")
with self.engine.connect() as conn:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one()
data_list_message += f" - {table:<25} ({count:>6} 条记录)"
data_list_message += "\n"
except:
print(f" - {table:<25} (查询失败)")
data_list_message += f" - {table:<25} (查询失败)"
data_list_message += "\n"
logger.info(data_list_message)
def show_statistics(self):
"""显示数据统计"""
print("\n" + "=" * 60)
print("数据统计")
print("=" * 60)
cursor = self.connection.cursor()
data_statistics_message = ""
data_statistics_message += "\n" + "=" * 60
data_statistics_message += "数据统计"
data_statistics_message += "=" * 60
data_statistics_message += "\n"
try:
# 新闻统计
cursor.execute("SELECT COUNT(*) FROM daily_news")
news_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT crawl_date) FROM daily_news")
news_days = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(DISTINCT source_platform) FROM daily_news")
platforms = cursor.fetchone()[0]
print(f"新闻数据:")
print(f" - 总新闻数: {news_count}")
print(f" - 覆盖天数: {news_days}")
print(f" - 新闻平台: {platforms}")
with self.engine.connect() as conn:
news_count = conn.execute(text("SELECT COUNT(*) FROM daily_news")).scalar_one()
news_days = conn.execute(text("SELECT COUNT(DISTINCT crawl_date) FROM daily_news")).scalar_one()
platforms = conn.execute(text("SELECT COUNT(DISTINCT source_platform) FROM daily_news")).scalar_one()
data_statistics_message += "新闻数据:"
data_statistics_message += "\n"
data_statistics_message += f" - 总新闻数: {news_count}"
data_statistics_message += "\n"
data_statistics_message += f" - 覆盖天数: {news_days}"
data_statistics_message += "\n"
data_statistics_message += f" - 新闻平台: {platforms}"
data_statistics_message += "\n"
# 话题统计
cursor.execute("SELECT COUNT(*) FROM daily_topics")
topic_count = cursor.fetchone()[0]
with self.engine.connect() as conn:
topic_count = conn.execute(text("SELECT COUNT(*) FROM daily_topics")).scalar_one()
topic_days = conn.execute(text("SELECT COUNT(DISTINCT extract_date) FROM daily_topics")).scalar_one()
cursor.execute("SELECT COUNT(DISTINCT extract_date) FROM daily_topics")
topic_days = cursor.fetchone()[0]
print(f"\n话题数据:")
print(f" - 总话题数: {topic_count}")
print(f" - 提取天数: {topic_days}")
data_statistics_message += "话题数据:"
data_statistics_message += "\n"
data_statistics_message += f" - 总话题数: {topic_count}"
data_statistics_message += "\n"
data_statistics_message += f" - 提取天数: {topic_days}"
data_statistics_message += "\n"
# 爬取任务统计
cursor.execute("SELECT COUNT(*) FROM crawling_tasks")
task_count = cursor.fetchone()[0]
cursor.execute("SELECT task_status, COUNT(*) FROM crawling_tasks GROUP BY task_status")
task_status = cursor.fetchall()
with self.engine.connect() as conn:
task_count = conn.execute(text("SELECT COUNT(*) FROM crawling_tasks")).scalar_one()
task_status = conn.execute(text("SELECT task_status, COUNT(*) FROM crawling_tasks GROUP BY task_status")).all()
print(f"\n爬取任务:")
print(f" - 总任务数: {task_count}")
data_statistics_message += "爬取任务:"
data_statistics_message += "\n"
data_statistics_message += f" - 总任务数: {task_count}"
data_statistics_message += "\n"
for status, count in task_status:
print(f" - {status}: {count}")
data_statistics_message += f" - {status}: {count}"
data_statistics_message += "\n"
# 爬取内容统计
print(f"\n平台内容统计:")
data_statistics_message += "平台内容统计:"
data_statistics_message += "\n"
platform_tables = {
'xhs_note': '小红书',
'douyin_aweme': '抖音',
... ... @@ -150,60 +159,78 @@ class DatabaseManager:
for table, platform in platform_tables.items():
try:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" - {platform}: {count}")
with self.engine.connect() as conn:
count = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar_one()
data_statistics_message += f" - {platform}: {count}"
data_statistics_message += "\n"
except:
print(f" - {platform}: 表不存在")
data_statistics_message += f" - {platform}: 表不存在"
data_statistics_message += "\n"
logger.info(data_statistics_message)
except Exception as e:
print(f"统计查询失败: {e}")
data_statistics_message += f"统计查询失败: {e}"
data_statistics_message += "\n"
logger.error(data_statistics_message)
def show_recent_data(self, days=7):
"""显示最近几天的数据"""
print(f"\n" + "=" * 60)
print(f"最近{days}天的数据")
print("=" * 60)
cursor = self.connection.cursor()
data_recent_message = ""
data_recent_message += "\n" + "=" * 60
data_recent_message += "最近" + str(days) + "天的数据"
data_recent_message += "=" * 60
from datetime import date, timedelta
start_date = date.today() - timedelta(days=days)
# 最近的新闻
cursor.execute("""
SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms
FROM daily_news
WHERE crawl_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY crawl_date
ORDER BY crawl_date DESC
""", (days,))
news_data = cursor.fetchall()
with self.engine.connect() as conn:
news_data = conn.execute(
text(
"""
SELECT crawl_date, COUNT(*) as news_count, COUNT(DISTINCT source_platform) as platforms
FROM daily_news
WHERE crawl_date >= :start_date
GROUP BY crawl_date
ORDER BY crawl_date DESC
"""
),
{"start_date": start_date},
).all()
if news_data:
print("每日新闻统计:")
data_recent_message += "每日新闻统计:"
data_recent_message += "\n"
for date, count, platforms in news_data:
print(f" {date}: {count} 条新闻, {platforms} 个平台")
data_recent_message += f" {date}: {count} 条新闻, {platforms} 个平台"
data_recent_message += "\n"
# 最近的话题
cursor.execute("""
SELECT extract_date, COUNT(*) as topic_count
FROM daily_topics
WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY extract_date
ORDER BY extract_date DESC
""", (days,))
topic_data = cursor.fetchall()
with self.engine.connect() as conn:
topic_data = conn.execute(
text(
"""
SELECT extract_date, COUNT(*) as topic_count
FROM daily_topics
WHERE extract_date >= :start_date
GROUP BY extract_date
ORDER BY extract_date DESC
"""
),
{"start_date": start_date},
).all()
if topic_data:
print("\n每日话题统计:")
data_recent_message += "每日话题统计:"
data_recent_message += "\n"
for date, count in topic_data:
print(f" {date}: {count} 个话题")
data_recent_message += f" {date}: {count} 个话题"
data_recent_message += "\n"
logger.info(data_recent_message)
def cleanup_old_data(self, days=90, dry_run=True):
"""清理旧数据"""
print(f"\n" + "=" * 60)
print(f"清理{days}天前的数据 ({'预览模式' if dry_run else '执行模式'})")
print("=" * 60)
cleanup_message = ""
cleanup_message += "\n" + "=" * 60
cleanup_message += f"清理{days}天前的数据 ({'预览模式' if dry_run else '执行模式'})"
cleanup_message += "=" * 60
cursor = self.connection.cursor()
cutoff_date = datetime.now() - timedelta(days=days)
# 检查要删除的数据
... ... @@ -213,20 +240,25 @@ class DatabaseManager:
("crawling_tasks", f"SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date < '{cutoff_date.date()}'")
]
for table, query in cleanup_queries:
cursor.execute(query)
count = cursor.fetchone()[0]
if count > 0:
print(f" {table}: {count} 条记录将被删除")
if not dry_run:
delete_query = query.replace("SELECT COUNT(*)", "DELETE")
cursor.execute(delete_query)
print(f" 已删除 {count} 条记录")
else:
print(f" {table}: 无需清理")
with self.engine.begin() as conn:
for table, query in cleanup_queries:
count = conn.execute(text(query)).scalar_one()
if count > 0:
cleanup_message += f" {table}: {count} 条记录将被删除"
cleanup_message += "\n"
if not dry_run:
delete_query = query.replace("SELECT COUNT(*)", "DELETE")
conn.execute(text(delete_query))
cleanup_message += f" 已删除 {count} 条记录"
cleanup_message += "\n"
else:
cleanup_message += f" {table}: 无需清理"
cleanup_message += "\n"
if dry_run:
print("\n这是预览模式,没有实际删除数据。使用 --execute 参数执行实际清理。")
cleanup_message += "\n这是预览模式,没有实际删除数据。使用 --execute 参数执行实际清理。"
cleanup_message += "\n"
logger.info(cleanup_message)
def main():
parser = argparse.ArgumentParser(description="MindSpider数据库管理工具")
... ...
... ... @@ -9,6 +9,7 @@ import os
import sys
import pymysql
from pathlib import Path
from MindSpider.config import settings
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
... ... @@ -26,14 +27,14 @@ def create_database_connection():
"""创建数据库连接"""
try:
connection = pymysql.connect(
host=config.DB_HOST,
port=config.DB_PORT,
user=config.DB_USER,
password=config.DB_PASSWORD,
charset=config.DB_CHARSET,
host=settings.db_host,
port=settings.db_port,
user=settings.db_user,
password=settings.db_password,
charset=settings.db_charset,
autocommit=True
)
print(f"成功连接到MySQL服务器: {config.DB_HOST}:{config.DB_PORT}")
print(f"成功连接到MySQL服务器: {settings.db_host}:{settings.db_port}")
return connection
except Exception as e:
print(f"连接数据库失败: {e}")
... ... @@ -43,9 +44,9 @@ def create_database(connection):
"""创建数据库"""
try:
cursor = connection.cursor()
cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{config.DB_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
cursor.execute(f"USE `{config.DB_NAME}`")
print(f"数据库 '{config.DB_NAME}' 创建/选择成功")
cursor.execute(f"CREATE DATABASE IF NOT EXISTS `{settings.db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
cursor.execute(f"USE `{settings.db_name}`")
print(f"数据库 '{settings.db_name}' 创建/选择成功")
return True
except Exception as e:
print(f"创建数据库失败: {e}")
... ... @@ -56,18 +57,18 @@ def execute_sql_file(connection, sql_file_path, description=""):
if not os.path.exists(sql_file_path):
print(f"警告: SQL文件不存在: {sql_file_path}")
return False
try:
cursor = connection.cursor()
with open(sql_file_path, 'r', encoding='utf-8') as f:
sql_content = f.read()
# 分割SQL语句(简单实现,按分号分割)
sql_statements = [stmt.strip() for stmt in sql_content.split(';') if stmt.strip()]
success_count = 0
error_count = 0
for stmt in sql_statements:
if not stmt or stmt.startswith('--'):
continue
... ... @@ -77,10 +78,10 @@ def execute_sql_file(connection, sql_file_path, description=""):
except Exception as e:
error_count += 1
print(f"执行SQL语句失败: {str(e)[:100]}...")
print(f"{description} - 成功执行: {success_count} 条语句, 失败: {error_count} 条语句")
return error_count == 0
except Exception as e:
print(f"执行SQL文件失败 {sql_file_path}: {e}")
return False
... ... @@ -90,44 +91,44 @@ def main():
print("=" * 60)
print("MindSpider AI爬虫项目 - 数据库初始化")
print("=" * 60)
# 检查配置
print("检查数据库配置...")
print(f"数据库主机: {config.DB_HOST}")
print(f"数据库端口: {config.DB_PORT}")
print(f"数据库名称: {config.DB_NAME}")
print(f"数据库用户: {config.DB_USER}")
print(f"字符集: {config.DB_CHARSET}")
print(f"数据库主机: {settings.db_host}")
print(f"数据库端口: {settings.db_port}")
print(f"数据库名称: {settings.db_name}")
print(f"数据库用户: {settings.db_user}")
print(f"字符集: {settings.db_charset}")
print()
# 创建数据库连接
print("正在连接数据库...")
connection = create_database_connection()
if not connection:
print("数据库初始化失败!")
return False
try:
# 创建数据库
print("正在创建/选择数据库...")
if not create_database(connection):
return False
# 获取SQL文件路径
schema_dir = Path(__file__).parent
mediacrawler_sql = schema_dir.parent / "DeepSentimentCrawling" / "MediaCrawler" / "schema" / "tables.sql"
mindspider_sql = schema_dir / "mindspider_tables.sql"
print()
print("开始执行SQL脚本...")
# 1. 执行MediaCrawler的原始表结构
if mediacrawler_sql.exists():
print("1. 创建MediaCrawler基础表...")
execute_sql_file(connection, str(mediacrawler_sql), "MediaCrawler基础表")
else:
print("警告: MediaCrawler SQL文件不存在,跳过基础表创建")
# 2. 执行MindSpider扩展表结构
print("2. 创建MindSpider扩展表...")
if mindspider_sql.exists():
... ... @@ -135,18 +136,18 @@ def main():
else:
print("错误: MindSpider SQL文件不存在")
return False
print()
print("=" * 60)
print("数据库初始化完成!")
print("=" * 60)
# 显示创建的表
cursor = connection.cursor()
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"数据库 '{config.DB_NAME}' 中共创建了 {len(tables)} 个表:")
print(f"数据库 '{settings.db_name}' 中共创建了 {len(tables)} 个表:")
for table in tables:
print(f" - {table[0]}")
... ...