database_manager.py 10.3 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
BroadTopicExtraction模块 - 数据库管理器
只负责新闻数据和话题分析的存储和查询
"""

import sys
import json
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Optional
import pymysql
from pymysql.cursors import DictCursor

# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))

try:
    import config
except ImportError:
    raise ImportError("无法导入config.py配置文件")

class DatabaseManager:
    """数据库管理器"""
    
    def __init__(self):
        """初始化数据库管理器"""
        self.connection = None
        self.connect()
    
    def connect(self):
        """连接数据库"""
        try:
            self.connection = pymysql.connect(
                host=config.DB_HOST,
                port=config.DB_PORT,
                user=config.DB_USER,
                password=config.DB_PASSWORD,
                database=config.DB_NAME,
                charset=config.DB_CHARSET,
                autocommit=True,
                cursorclass=DictCursor
            )
            print(f"成功连接到数据库: {config.DB_NAME}")
        except Exception as e:
            print(f"数据库连接失败: {e}")
            raise
    
    def close(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            print("数据库连接已关闭")
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    # ==================== 新闻数据操作 ====================
    
    def save_daily_news(self, news_data: List[Dict], crawl_date: date = None) -> int:
        """
        保存每日新闻数据,如果当天已有数据则覆盖
        
        Args:
            news_data: 新闻数据列表
            crawl_date: 爬取日期,默认为今天
        
        Returns:
            保存的新闻数量
        """
        if not crawl_date:
            crawl_date = date.today()
        
        current_timestamp = int(datetime.now().timestamp())
        
        try:
            cursor = self.connection.cursor()
            
            # 先删除当天所有的新闻记录(覆盖模式)
            delete_query = "DELETE FROM daily_news WHERE crawl_date = %s"
            deleted_count = cursor.execute(delete_query, (crawl_date,))
            if deleted_count > 0:
                print(f"覆盖模式:删除了当天已有的 {deleted_count} 条新闻记录")
            
            # 批量插入新记录
            saved_count = 0
            for news_item in news_data:
                try:
                    # 简化的新闻ID生成
                    news_id = f"{news_item.get('source', 'unknown')}_{news_item.get('id', news_item.get('rank', 0))}"
                    
                    # 插入新记录
                    insert_query = """
                        INSERT INTO daily_news (
                            news_id, source_platform, title, url, crawl_date, 
                            rank_position, add_ts
                        ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                    """
                    cursor.execute(insert_query, (
                        news_id,
                        news_item.get('source', 'unknown'),
                        news_item.get('title', ''),
                        news_item.get('url', ''),
                        crawl_date,
                        news_item.get('rank', None),
                        current_timestamp
                    ))
                    saved_count += 1
                    
                except Exception as e:
                    print(f"保存单条新闻失败: {e}")
                    continue
            
            print(f"成功保存 {saved_count} 条新闻记录")
            return saved_count
            
        except Exception as e:
            print(f"保存新闻数据失败: {e}")
            return 0
    
    def get_daily_news(self, crawl_date: date = None) -> List[Dict]:
        """
        获取每日新闻数据
        
        Args:
            crawl_date: 爬取日期,默认为今天
        
        Returns:
            新闻列表
        """
        if not crawl_date:
            crawl_date = date.today()
        
        query = """
            SELECT * FROM daily_news 
            WHERE crawl_date = %s 
            ORDER BY rank_position ASC
        """
        
        cursor = self.connection.cursor()
        cursor.execute(query, (crawl_date,))
        return cursor.fetchall()
    
    # ==================== 话题数据操作 ====================
    
    def save_daily_topics(self, keywords: List[str], summary: str, extract_date: date = None) -> bool:
        """
        保存每日话题分析
        
        Args:
            keywords: 话题关键词列表
            summary: 新闻分析总结
            extract_date: 提取日期,默认为今天
        
        Returns:
            是否保存成功
        """
        if not extract_date:
            extract_date = date.today()
        
        current_timestamp = int(datetime.now().timestamp())
        
        try:
            cursor = self.connection.cursor()
            
            # 检查今天是否已有记录
            check_query = "SELECT id FROM daily_topics WHERE extract_date = %s"
            cursor.execute(check_query, (extract_date,))
            existing = cursor.fetchone()
            
            keywords_json = json.dumps(keywords, ensure_ascii=False)
            
            if existing:
                # 更新现有记录
                update_query = """
                    UPDATE daily_topics 
                    SET keywords = %s, summary = %s, add_ts = %s
                    WHERE extract_date = %s
                """
                cursor.execute(update_query, (keywords_json, summary, current_timestamp, extract_date))
                print(f"更新了 {extract_date} 的话题分析")
            else:
                # 插入新记录
                insert_query = """
                    INSERT INTO daily_topics (extract_date, keywords, summary, add_ts)
                    VALUES (%s, %s, %s, %s)
                """
                cursor.execute(insert_query, (extract_date, keywords_json, summary, current_timestamp))
                print(f"保存了 {extract_date} 的话题分析")
            
            return True
            
        except Exception as e:
            print(f"保存话题分析失败: {e}")
            return False
    
    def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
        """
        获取每日话题分析
        
        Args:
            extract_date: 提取日期,默认为今天
        
        Returns:
            话题分析数据,如果不存在返回None
        """
        if not extract_date:
            extract_date = date.today()
        
        try:
            cursor = self.connection.cursor()
            query = "SELECT * FROM daily_topics WHERE extract_date = %s"
            cursor.execute(query, (extract_date,))
            result = cursor.fetchone()
            
            if result:
                # 解析关键词JSON
                result['keywords'] = json.loads(result['keywords'])
                return result
            else:
                return None
                
        except Exception as e:
            print(f"获取话题分析失败: {e}")
            return None
    
    def get_recent_topics(self, days: int = 7) -> List[Dict]:
        """
        获取最近几天的话题分析
        
        Args:
            days: 天数
        
        Returns:
            话题分析列表
        """
        try:
            cursor = self.connection.cursor()
            query = """
                SELECT * FROM daily_topics 
                WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
                ORDER BY extract_date DESC
            """
            cursor.execute(query, (days,))
            results = cursor.fetchall()
            
            # 解析每个结果的关键词JSON
            for result in results:
                result['keywords'] = json.loads(result['keywords'])
            
            return results
            
        except Exception as e:
            print(f"获取最近话题分析失败: {e}")
            return []
    
    # ==================== 统计查询 ====================
    
    def get_summary_stats(self, days: int = 7) -> Dict:
        """获取统计摘要"""
        try:
            cursor = self.connection.cursor()
            
            # 新闻统计
            news_query = """
                SELECT 
                    crawl_date,
                    COUNT(*) as news_count,
                    COUNT(DISTINCT source_platform) as platforms_count
                FROM daily_news 
                WHERE crawl_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
                GROUP BY crawl_date
                ORDER BY crawl_date DESC
            """
            cursor.execute(news_query, (days,))
            news_stats = cursor.fetchall()
            
            # 话题统计
            topics_query = """
                SELECT 
                    extract_date,
                    keywords,
                    CHAR_LENGTH(summary) as summary_length
                FROM daily_topics 
                WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
                ORDER BY extract_date DESC
            """
            cursor.execute(topics_query, (days,))
            topics_stats = cursor.fetchall()
            
            return {
                'news_stats': news_stats,
                'topics_stats': topics_stats
            }
            
        except Exception as e:
            print(f"获取统计摘要失败: {e}")
            return {'news_stats': [], 'topics_stats': []}

if __name__ == "__main__":
    # 测试数据库管理器
    with DatabaseManager() as db:
        # 测试获取新闻
        news = db.get_daily_news()
        print(f"今日新闻数量: {len(news)}")
        
        # 测试获取话题
        topics = db.get_daily_topics()
        if topics:
            print(f"今日话题关键词: {topics['keywords']}")
        else:
            print("今日暂无话题分析")
        
        print("简化数据库管理器测试完成!")