You need to sign in or sign up before continuing.
keyword_manager.py 11 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DeepSentimentCrawling模块 - 关键词管理器
从BroadTopicExtraction模块获取关键词并分配给不同平台进行爬取
"""

import sys
import json
from datetime import date, timedelta, datetime
from pathlib import Path
from typing import List, Dict, Optional
import random
import pymysql
from pymysql.cursors import DictCursor

# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))

try:
    import config
except ImportError:
    raise ImportError("无法导入config.py配置文件")

class KeywordManager:
    """关键词管理器"""
    
    def __init__(self):
        """初始化关键词管理器"""
        self.connection = None
        self.connect()
    
    def connect(self):
        """连接数据库"""
        try:
            self.connection = pymysql.connect(
                host=config.DB_HOST,
                port=config.DB_PORT,
                user=config.DB_USER,
                password=config.DB_PASSWORD,
                database=config.DB_NAME,
                charset=config.DB_CHARSET,
                autocommit=True,
                cursorclass=DictCursor
            )
            print(f"关键词管理器成功连接到数据库: {config.DB_NAME}")
        except Exception as e:
            print(f"关键词管理器数据库连接失败: {e}")
            raise
    
    def get_latest_keywords(self, target_date: date = None, max_keywords: int = 100) -> List[str]:
        """
        获取最新的关键词列表
        
        Args:
            target_date: 目标日期,默认为今天
            max_keywords: 最大关键词数量
        
        Returns:
            关键词列表
        """
        if not target_date:
            target_date = date.today()
        
        print(f"正在获取 {target_date} 的关键词...")
        
        # 首先尝试获取指定日期的关键词
        topics_data = self.get_daily_topics(target_date)
        
        if topics_data and topics_data.get('keywords'):
            keywords = topics_data['keywords']
            print(f"成功获取 {target_date} 的 {len(keywords)} 个关键词")
            
            # 如果关键词太多,随机选择指定数量
            if len(keywords) > max_keywords:
                keywords = random.sample(keywords, max_keywords)
                print(f"随机选择了 {max_keywords} 个关键词")
            
            return keywords
        
        # 如果没有当天的关键词,尝试获取最近几天的
        print(f"{target_date} 没有关键词数据,尝试获取最近的关键词...")
        recent_topics = self.get_recent_topics(days=7)
        
        if recent_topics:
            # 合并最近几天的关键词
            all_keywords = []
            for topic in recent_topics:
                if topic.get('keywords'):
                    all_keywords.extend(topic['keywords'])
            
            # 去重并限制数量
            unique_keywords = list(set(all_keywords))
            if len(unique_keywords) > max_keywords:
                unique_keywords = random.sample(unique_keywords, max_keywords)
            
            print(f"从最近7天的数据中获取到 {len(unique_keywords)} 个关键词")
            return unique_keywords
        
        # 如果都没有,返回默认关键词
        print("没有找到任何关键词数据,使用默认关键词")
        return self._get_default_keywords()
    
    def get_daily_topics(self, extract_date: date = None) -> Optional[Dict]:
        """
        获取每日话题分析
        
        Args:
            extract_date: 提取日期,默认为今天
        
        Returns:
            话题分析数据,如果不存在返回None
        """
        if not extract_date:
            extract_date = date.today()
        
        try:
            cursor = self.connection.cursor()
            query = "SELECT * FROM daily_topics WHERE extract_date = %s"
            cursor.execute(query, (extract_date,))
            result = cursor.fetchone()
            
            if result:
                # 解析关键词JSON
                result['keywords'] = json.loads(result['keywords'])
                return result
            else:
                return None
                
        except Exception as e:
            print(f"获取话题分析失败: {e}")
            return None
    
    def get_recent_topics(self, days: int = 7) -> List[Dict]:
        """
        获取最近几天的话题分析
        
        Args:
            days: 天数
        
        Returns:
            话题分析列表
        """
        try:
            cursor = self.connection.cursor()
            query = """
                SELECT * FROM daily_topics 
                WHERE extract_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
                ORDER BY extract_date DESC
            """
            cursor.execute(query, (days,))
            results = cursor.fetchall()
            
            # 解析每个结果的关键词JSON
            for result in results:
                result['keywords'] = json.loads(result['keywords'])
            
            return results
            
        except Exception as e:
            print(f"获取最近话题分析失败: {e}")
            return []
    
    def _get_default_keywords(self) -> List[str]:
        """获取默认关键词列表"""
        return [
            "科技", "人工智能", "AI", "编程", "互联网",
            "创业", "投资", "理财", "股市", "经济",
            "教育", "学习", "考试", "大学", "就业",
            "健康", "养生", "运动", "美食", "旅游",
            "时尚", "美妆", "购物", "生活", "家居",
            "电影", "音乐", "游戏", "娱乐", "明星",
            "新闻", "热点", "社会", "政策", "环保"
        ]
    
    def get_all_keywords_for_platforms(self, platforms: List[str], target_date: date = None, 
                                      max_keywords: int = 100) -> List[str]:
        """
        为所有平台获取相同的关键词列表
        
        Args:
            platforms: 平台列表
            target_date: 目标日期
            max_keywords: 最大关键词数量
        
        Returns:
            关键词列表(所有平台共用)
        """
        keywords = self.get_latest_keywords(target_date, max_keywords)
        
        if keywords:
            print(f"为 {len(platforms)} 个平台准备了相同的 {len(keywords)} 个关键词")
            print(f"每个关键词将在所有平台上进行爬取")
        
        return keywords
    
    def get_keywords_for_platform(self, platform: str, target_date: date = None, 
                                max_keywords: int = 50) -> List[str]:
        """
        为特定平台获取关键词(现在所有平台使用相同关键词)
        
        Args:
            platform: 平台名称
            target_date: 目标日期
            max_keywords: 最大关键词数量
        
        Returns:
            关键词列表(与其他平台相同)
        """
        keywords = self.get_latest_keywords(target_date, max_keywords)
        
        print(f"为平台 {platform} 准备了 {len(keywords)} 个关键词(与其他平台相同)")
        return keywords
    
    def _filter_keywords_by_platform(self, keywords: List[str], platform: str) -> List[str]:
        """
        根据平台特性过滤关键词
        
        Args:
            keywords: 原始关键词列表
            platform: 平台名称
        
        Returns:
            过滤后的关键词列表
        """
        # 平台特性关键词映射(可以根据需要调整)
        platform_preferences = {
            'xhs': ['美妆', '时尚', '生活', '美食', '旅游', '购物', '健康', '养生'],
            'dy': ['娱乐', '音乐', '舞蹈', '搞笑', '美食', '生活', '科技', '教育'],
            'ks': ['生活', '搞笑', '农村', '美食', '手工', '音乐', '娱乐'],
            'bili': ['科技', '游戏', '动漫', '学习', '编程', '数码', '科普'],
            'wb': ['热点', '新闻', '娱乐', '明星', '社会', '时事', '科技'],
            'tieba': ['游戏', '动漫', '学习', '生活', '兴趣', '讨论'],
            'zhihu': ['知识', '学习', '科技', '职场', '投资', '教育', '思考']
        }
        
        # 如果平台有特定偏好,优先选择相关关键词
        preferred_keywords = platform_preferences.get(platform, [])
        
        if preferred_keywords:
            # 先选择平台偏好的关键词
            filtered = []
            remaining = []
            
            for keyword in keywords:
                if any(pref in keyword for pref in preferred_keywords):
                    filtered.append(keyword)
                else:
                    remaining.append(keyword)
            
            # 如果偏好关键词不够,补充其他关键词
            if len(filtered) < len(keywords) // 2:
                filtered.extend(remaining[:len(keywords) - len(filtered)])
            
            return filtered
        
        # 如果没有特定偏好,返回原关键词
        return keywords
    
    def get_crawling_summary(self, target_date: date = None) -> Dict:
        """
        获取爬取任务摘要
        
        Args:
            target_date: 目标日期
        
        Returns:
            爬取摘要信息
        """
        if not target_date:
            target_date = date.today()
        
        topics_data = self.get_daily_topics(target_date)
        
        if topics_data:
            return {
                'date': target_date,
                'keywords_count': len(topics_data.get('keywords', [])),
                'summary': topics_data.get('summary', ''),
                'has_data': True
            }
        else:
            return {
                'date': target_date,
                'keywords_count': 0,
                'summary': '暂无数据',
                'has_data': False
            }
    
    def close(self):
        """关闭数据库连接"""
        if self.connection:
            self.connection.close()
            print("关键词管理器数据库连接已关闭")
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

if __name__ == "__main__":
    # 测试关键词管理器
    with KeywordManager() as km:
        # 测试获取关键词
        keywords = km.get_latest_keywords(max_keywords=20)
        print(f"获取到的关键词: {keywords}")
        
        # 测试平台分配
        platforms = ['xhs', 'dy', 'bili']
        distribution = km.distribute_keywords_by_platform(keywords, platforms)
        for platform, kws in distribution.items():
            print(f"{platform}: {kws}")
        
        # 测试爬取摘要
        summary = km.get_crawling_summary()
        print(f"爬取摘要: {summary}")
        
        print("关键词管理器测试完成!")