spider_control.py 6.2 KB
from flask import Blueprint, jsonify, request, render_template
import json
import os
from datetime import datetime
import threading
from queue import Queue
import asyncio
import websockets
import logging
from spider.spiderData import SpiderData

# 创建蓝图
spider_bp = Blueprint('spider', __name__)

# 创建日志记录器
logger = logging.getLogger('spider_control')
logger.setLevel(logging.INFO)

# 存储WebSocket连接的集合
websocket_connections = set()

# 创建消息队列
message_queue = Queue()

# 默认配置
DEFAULT_CONFIG = {
    'crawlDepth': 3,
    'interval': 5,
    'maxRetries': 3,
    'timeout': 30
}

def load_config():
    """加载爬虫配置"""
    config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json')
    try:
        if os.path.exists(config_path):
            with open(config_path, 'r', encoding='utf-8') as f:
                return json.load(f)
    except Exception as e:
        logger.error(f"加载配置文件失败: {e}")
    return DEFAULT_CONFIG

def save_config(config):
    """保存爬虫配置"""
    config_path = os.path.join(os.path.dirname(__file__), '../spider/config.json')
    try:
        with open(config_path, 'w', encoding='utf-8') as f:
            json.dump(config, f, ensure_ascii=False, indent=4)
        return True
    except Exception as e:
        logger.error(f"保存配置文件失败: {e}")
        return False

async def broadcast_message(message):
    """广播消息到所有WebSocket连接"""
    if not websocket_connections:
        return
    
    for websocket in websocket_connections.copy():
        try:
            await websocket.send(json.dumps(message))
        except websockets.exceptions.ConnectionClosed:
            websocket_connections.remove(websocket)
        except Exception as e:
            logger.error(f"发送WebSocket消息失败: {e}")
            websocket_connections.remove(websocket)

def spider_worker(topics, parameters):
    """爬虫工作线程"""
    total_topics = len(topics)
    completed_topics = 0
    
    try:
        spider = SpiderData()
        
        for topic in topics:
            try:
                # 更新进度
                progress = int((completed_topics / total_topics) * 100)
                asyncio.run(broadcast_message({
                    'type': 'progress',
                    'value': progress
                }))
                
                # 发送开始爬取的日志
                asyncio.run(broadcast_message({
                    'type': 'log',
                    'message': f'开始爬取话题: {topic}'
                }))
                
                # 执行爬取
                spider.crawl_topic(
                    topic=topic,
                    depth=parameters['crawlDepth'],
                    interval=parameters['interval'],
                    max_retries=parameters['maxRetries'],
                    timeout=parameters['timeout']
                )
                
                completed_topics += 1
                
                # 发送完成爬取的日志
                asyncio.run(broadcast_message({
                    'type': 'log',
                    'message': f'话题 {topic} 爬取完成'
                }))
                
            except Exception as e:
                # 发送错误日志
                asyncio.run(broadcast_message({
                    'type': 'log',
                    'message': f'爬取话题 {topic} 时出错: {str(e)}'
                }))
        
        # 更新最终进度
        asyncio.run(broadcast_message({
            'type': 'progress',
            'value': 100
        }))
        
        # 发送完成消息
        asyncio.run(broadcast_message({
            'type': 'log',
            'message': '所有话题爬取完成'
        }))
        
    except Exception as e:
        # 发送错误日志
        asyncio.run(broadcast_message({
            'type': 'log',
            'message': f'爬虫任务执行出错: {str(e)}'
        }))

@spider_bp.route('/spider/control')
def spider_control():
    """渲染爬虫控制页面"""
    return render_template('spider_control.html')

@spider_bp.route('/api/spider/start', methods=['POST'])
def start_spider():
    """启动爬虫任务"""
    try:
        data = request.get_json()
        topics = data.get('topics', [])
        parameters = data.get('parameters', DEFAULT_CONFIG)
        
        if not topics:
            return jsonify({
                'success': False,
                'message': '请选择至少一个话题'
            })
        
        # 启动爬虫线程
        thread = threading.Thread(
            target=spider_worker,
            args=(topics, parameters),
            daemon=True
        )
        thread.start()
        
        return jsonify({
            'success': True,
            'message': '爬虫任务已启动'
        })
        
    except Exception as e:
        logger.error(f"启动爬虫任务失败: {e}")
        return jsonify({
            'success': False,
            'message': str(e)
        })

@spider_bp.route('/api/spider/save-config', methods=['POST'])
def save_spider_config():
    """保存爬虫配置"""
    try:
        config = request.get_json()
        if save_config(config):
            return jsonify({
                'success': True,
                'message': '配置保存成功'
            })
        else:
            return jsonify({
                'success': False,
                'message': '配置保存失败'
            })
    except Exception as e:
        logger.error(f"保存配置失败: {e}")
        return jsonify({
            'success': False,
            'message': str(e)
        })

@spider_bp.websocket('/ws/spider-status')
async def spider_status_socket():
    """WebSocket连接处理"""
    try:
        websocket = websockets.WebSocketServerProtocol()
        websocket_connections.add(websocket)
        
        try:
            while True:
                # 保持连接活跃
                await websocket.ping()
                await asyncio.sleep(30)
        except websockets.exceptions.ConnectionClosed:
            pass
        finally:
            websocket_connections.remove(websocket)
    except Exception as e:
        logger.error(f"WebSocket连接处理失败: {e}")