You need to sign in or sign up before continuing.
main.py 12.1 KB
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
BroadTopicExtraction模块 - 主程序
整合话题提取的完整工作流程和命令行工具
"""

import sys
import asyncio
import argparse
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger

# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))

try:
    from BroadTopicExtraction.get_today_news import NewsCollector, SOURCE_NAMES
    from BroadTopicExtraction.topic_extractor import TopicExtractor
    from BroadTopicExtraction.database_manager import DatabaseManager
except ImportError as e:
    logger.exception(f"导入模块失败: {e}")
    logger.error("请确保在项目根目录运行,并且已安装所有依赖")
    sys.exit(1)

class BroadTopicExtraction:
    """BroadTopicExtraction主要工作流程"""
    
    def __init__(self):
        """初始化"""
        self.news_collector = NewsCollector()
        self.topic_extractor = TopicExtractor()
        self.db_manager = DatabaseManager()
        
        logger.info("BroadTopicExtraction 初始化完成")
    
    def close(self):
        """关闭资源"""
        if self.news_collector:
            self.news_collector.close()
        if self.db_manager:
            self.db_manager.close()
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    async def __aenter__(self):
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.close()
    
    async def run_daily_extraction(self, 
                                  news_sources: Optional[List[str]] = None,
                                  max_keywords: int = 100) -> Dict:
        """
        运行每日话题提取流程
        
        Args:
            news_sources: 新闻源列表,None表示使用所有支持的源
            max_keywords: 最大关键词数量
            
        Returns:
            包含完整提取结果的字典
        """
        extraction_result_message = ""
        extraction_result_message += "\nMindSpider AI爬虫 - 每日话题提取\n"
        extraction_result_message += f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
        extraction_result_message += f"目标日期: {date.today()}\n"
        
        if news_sources:
            extraction_result_message += f"指定平台: {len(news_sources)} 个\n"
            for source in news_sources:
                source_name = SOURCE_NAMES.get(source, source)
                extraction_result_message += f"  - {source_name}\n"
        else:
            extraction_result_message += f"爬取平台: 全部 {len(SOURCE_NAMES)} 个平台\n"
        
        extraction_result_message += f"关键词数: 最多 {max_keywords} 个\n"
        
        logger.info(extraction_result_message)
        
        extraction_result = {
            'success': False,
            'extraction_date': date.today().isoformat(),
            'start_time': datetime.now().isoformat(),
            'news_collection': {},
            'topic_extraction': {},
            'database_save': {},
            'error': None
        }
        
        try:
            # 步骤1: 收集新闻
            logger.info("【步骤1】收集热点新闻...")
            news_result = await self.news_collector.collect_and_save_news(
                sources=news_sources
            )
            
            extraction_result['news_collection'] = {
                'success': news_result['success'],
                'total_news': news_result.get('total_news', 0),
                'successful_sources': news_result.get('successful_sources', 0),
                'total_sources': news_result.get('total_sources', 0)
            }
            
            if not news_result['success'] or not news_result['news_list']:
                raise Exception("新闻收集失败或没有获取到新闻")
            
            # 步骤2: 提取关键词和生成总结
            logger.info("【步骤2】提取关键词和生成总结...")
            keywords, summary = self.topic_extractor.extract_keywords_and_summary(
                news_result['news_list'], 
                max_keywords=max_keywords
            )
            
            extraction_result['topic_extraction'] = {
                'success': len(keywords) > 0,
                'keywords_count': len(keywords),
                'keywords': keywords,
                'summary': summary
            }
            
            if not keywords:
                logger.warning("警告: 没有提取到有效关键词")
            
            # 步骤3: 保存到数据库
            logger.info("【步骤3】保存分析结果到数据库...")
            save_success = self.db_manager.save_daily_topics(
                keywords, summary, date.today()
            )
            
            extraction_result['database_save'] = {
                'success': save_success
            }
            
            extraction_result['success'] = True
            extraction_result['end_time'] = datetime.now().isoformat()
            
            logger.info("每日话题提取流程完成!")
            
            return extraction_result
            
        except Exception as e:
            logger.exception(f"话题提取流程失败: {e}")
            extraction_result['error'] = str(e)
            extraction_result['end_time'] = datetime.now().isoformat()
            return extraction_result
    
    def print_extraction_results(self, extraction_result: Dict):
        """打印提取结果"""
        extraction_result_message = ""
        
        # 新闻收集结果
        news_data = extraction_result.get('news_collection', {})
        extraction_result_message += f"\n📰 新闻收集: {news_data.get('total_news', 0)} 条新闻\n"
        extraction_result_message += f"   成功源数: {news_data.get('successful_sources', 0)}/{news_data.get('total_sources', 0)}\n"
        
        # 话题提取结果
        topic_data = extraction_result.get('topic_extraction', {})
        keywords = topic_data.get('keywords', [])
        summary = topic_data.get('summary', '')
        
        extraction_result_message += f"\n🔑 提取关键词: {len(keywords)} 个\n"
        if keywords:
            # 每行显示5个关键词
            for i in range(0, len(keywords), 5):
                keyword_group = keywords[i:i+5]
                extraction_result_message += f"   {', '.join(keyword_group)}\n"
        
        extraction_result_message += f"\n📝 新闻总结:\n   {summary}\n"
        
        # 数据库保存结果
        db_data = extraction_result.get('database_save', {})
        if db_data.get('success'):
            extraction_result_message += f"\n💾 数据库保存: 成功\n"
        else:
            extraction_result_message += f"\n💾 数据库保存: 失败\n"
        
        logger.info(extraction_result_message)
    
    def get_keywords_for_crawling(self, extract_date: date = None) -> List[str]:
        """
        获取用于爬取的关键词列表
        
        Args:
            extract_date: 提取日期,默认为今天
            
        Returns:
            关键词列表
        """
        try:
            # 从数据库获取话题分析
            topics_data = self.db_manager.get_daily_topics(extract_date)
            
            if not topics_data:
                logger.info(f"没有找到 {extract_date or date.today()} 的话题数据")
                return []
            
            keywords = topics_data['keywords']
            
            # 生成搜索关键词
            search_keywords = self.topic_extractor.get_search_keywords(keywords)
            
            logger.info(f"准备了 {len(search_keywords)} 个关键词用于爬取")
            return search_keywords
            
        except Exception as e:
            logger.error(f"获取爬取关键词失败: {e}")
            return []
    
    def get_daily_analysis(self, target_date: date = None) -> Optional[Dict]:
        """获取指定日期的分析结果"""
        try:
            return self.db_manager.get_daily_topics(target_date)
        except Exception as e:
            logger.error(f"获取每日分析失败: {e}")
            return None
    
    def get_recent_analysis(self, days: int = 7) -> List[Dict]:
        """获取最近几天的分析结果"""
        try:
            return self.db_manager.get_recent_topics(days)
        except Exception as e:
            logger.error(f"获取最近分析失败: {e}")
            return []

# ==================== 命令行工具 ====================

async def run_extraction_command(sources=None, keywords_count=100, show_details=True):
    """运行话题提取命令"""
    
    try:
        async with BroadTopicExtraction() as extractor:
            # 运行话题提取
            result = await extractor.run_daily_extraction(
                news_sources=sources,
                max_keywords=keywords_count
            )
            
            if result['success']:
                if show_details:
                    # 显示详细结果
                    extractor.print_extraction_results(result)
                else:
                    # 只显示简要结果
                    news_data = result.get('news_collection', {})
                    topic_data = result.get('topic_extraction', {})
                    
                    logger.info(f"✅ 话题提取成功完成!")
                    logger.info(f"   收集新闻: {news_data.get('total_news', 0)} 条")
                    logger.info(f"   提取关键词: {len(topic_data.get('keywords', []))} 个")
                    logger.info(f"   生成总结: {len(topic_data.get('summary', ''))} 字符")
                
                # 获取爬取关键词
                crawling_keywords = extractor.get_keywords_for_crawling()
                
                if crawling_keywords:
                    logger.info(f"\n🔑 为DeepSentimentCrawling准备的搜索关键词:")
                    logger.info(f"   {', '.join(crawling_keywords)}")
                    
                    # 保存关键词到文件
                    keywords_file = project_root / "data" / "daily_keywords.txt"
                    keywords_file.parent.mkdir(exist_ok=True)
                    
                    with open(keywords_file, 'w', encoding='utf-8') as f:
                        f.write('\n'.join(crawling_keywords))
                    
                    logger.info(f"   关键词已保存到: {keywords_file}")
                
                return True
                
            else:
                logger.error(f"❌ 话题提取失败: {result.get('error', '未知错误')}")
                return False
                
    except Exception as e:
        logger.error(f"❌ 执行过程中发生错误: {e}")
        return False

def main():
    """主函数"""
    parser = argparse.ArgumentParser(description="MindSpider每日话题提取工具")
    parser.add_argument("--sources", nargs="+", help="指定新闻源平台", 
                       choices=list(SOURCE_NAMES.keys()))
    parser.add_argument("--keywords", type=int, default=100, help="最大关键词数量 (默认100)")
    parser.add_argument("--quiet", action="store_true", help="简化输出模式")
    parser.add_argument("--list-sources", action="store_true", help="显示支持的新闻源")
    
    args = parser.parse_args()
    
    # 显示支持的新闻源
    if args.list_sources:
        logger.info("支持的新闻源平台:")
        for source, name in SOURCE_NAMES.items():
            logger.info(f"  {source:<25} {name}")
        return
    
    # 验证参数
    if args.keywords < 1 or args.keywords > 200:
        logger.error("关键词数量应在1-200之间")
        sys.exit(1)
    
    # 运行提取
    try:
        success = asyncio.run(run_extraction_command(
            sources=args.sources,
            keywords_count=args.keywords,
            show_details=not args.quiet
        ))
        
        sys.exit(0 if success else 1)
        
    except KeyboardInterrupt:
        logger.info("用户中断操作")
        sys.exit(1)

if __name__ == "__main__":
    main()