Asy0y0
Committed by GitHub

Added dynamic scheduling functionality to adjust crawler execution intervals bas…

…ed on crawl duration and data volume.
Showing 1 changed file with 95 additions and 6 deletions
@@ -7,6 +7,8 @@ import subprocess @@ -7,6 +7,8 @@ import subprocess
7 from flask import Flask, session, request, redirect, render_template 7 from flask import Flask, session, request, redirect, render_template
8 from apscheduler.schedulers.background import BackgroundScheduler 8 from apscheduler.schedulers.background import BackgroundScheduler
9 from pytz import utc 9 from pytz import utc
  10 +from datetime import datetime, timedelta
  11 +import time
10 12
11 # 初始化日志记录 13 # 初始化日志记录
12 logging.basicConfig( 14 logging.basicConfig(
@@ -153,11 +155,90 @@ def run_script(): @@ -153,11 +155,90 @@ def run_script():
153 # 执行所有脚本 155 # 执行所有脚本
154 for script_name, script_path in scripts: 156 for script_name, script_path in scripts:
155 try: 157 try:
156 - print(f"Running {script_name}...") # 打印运行开始的信息 158 + logging.info(f"Running {script_name}...")
157 subprocess.run(['python', script_path], check=True) # 使用 subprocess 执行脚本 159 subprocess.run(['python', script_path], check=True) # 使用 subprocess 执行脚本
158 - print(f"{script_name} finished successfully.") # 打印脚本成功完成的消息 160 + logging.info(f"{script_name} finished successfully.")
159 except subprocess.CalledProcessError as e: 161 except subprocess.CalledProcessError as e:
160 - print(f"An error occurred while running {script_name}: {e}") # 打印错误信息 162 + logging.error(f"An error occurred while running {script_name}: {e}")
  163 +
  164 +# 新增功能:动态调度爬虫脚本
  165 +def check_database_empty():
  166 + """
  167 + 检查数据库中的指定表是否为空。
  168 +
  169 + :return: 如果表为空则返回 True,否则返回 False
  170 + """
  171 + try:
  172 + connection = pymysql.connect(**DB_CONFIG)
  173 + with connection.cursor() as cursor:
  174 + cursor.execute("SELECT COUNT(*) as count FROM article")
  175 + result = cursor.fetchone()
  176 + count = result['count'] if result and 'count' in result else 0
  177 + logging.info(f"数据库中共有 {count} 条记录。")
  178 + return count == 0
  179 + except pymysql.MySQLError as e:
  180 + logging.error(f"检查数据库失败: {e}")
  181 + return True # 连接失败时假设数据库为空,以防止阻塞
  182 + finally:
  183 + if 'connection' in locals():
  184 + connection.close()
  185 +
  186 +def dynamic_crawl():
  187 + """
  188 + 执行爬取任务并根据爬取耗时和获取的数据量动态调度下次爬取时间。
  189 + """
  190 + try:
  191 + start_time = time.time()
  192 + logging.info("开始爬取数据。")
  193 +
  194 + run_script() # 执行爬虫脚本
  195 +
  196 + end_time = time.time()
  197 + duration = end_time - start_time # 爬取耗时
  198 +
  199 + # 获取爬取后数据库中记录的数量作为数据量
  200 + try:
  201 + connection = pymysql.connect(**DB_CONFIG)
  202 + with connection.cursor() as cursor:
  203 + cursor.execute("SELECT COUNT(*) as count FROM article")
  204 + result = cursor.fetchone()
  205 + data_fetched = result['count'] if result and 'count' in result else 0
  206 + logging.info(f"爬取完成,耗时 {duration:.2f} 秒,数据库中共有 {data_fetched} 条记录。")
  207 + except pymysql.MySQLError as e:
  208 + logging.error(f"获取数据量失败: {e}")
  209 + data_fetched = 0
  210 + finally:
  211 + if 'connection' in locals():
  212 + connection.close()
  213 +
  214 + # 根据爬取耗时和数据量调整下次爬取时间
  215 + base_interval = 5 * 60 * 60 # 5小时的基础时间间隔(秒)
  216 +
  217 + if duration > 3600: # 爬取耗时超过1小时
  218 + next_interval = base_interval + duration
  219 + logging.info(f"检测到长时间爬取。下次爬取将在 {next_interval/3600:.2f} 小时后执行。")
  220 + elif data_fetched < 50: # 获取的数据量少于50条
  221 + next_interval = base_interval / 2
  222 + logging.info(f"获取数据量较少。下次爬取将在 {next_interval/60:.2f} 分钟后执行。")
  223 + else:
  224 + next_interval = base_interval
  225 + logging.info(f"标准爬取完成。下次爬取将在 {next_interval/3600:.2f} 小时后执行。")
  226 +
  227 + # 安排下次爬取任务
  228 + scheduler.add_job(dynamic_crawl, 'date', run_date=datetime.now() + timedelta(seconds=next_interval), id='dynamic_crawl')
  229 +
  230 + except Exception as e:
  231 + logging.error(f"动态爬取过程中发生错误: {e}")
  232 +
  233 +# 数据库配置,用于动态调度功能
  234 +DB_CONFIG = {
  235 + 'host': 'localhost',
  236 + 'user': 'root',
  237 + 'password': '12345678',
  238 + 'database': 'Weibo_PublicOpinion_AnalysisSystem',
  239 + 'port': 3306,
  240 + 'charset': 'utf8mb4'
  241 +}
161 242
162 # 主程序入口 243 # 主程序入口
163 if __name__ == '__main__': 244 if __name__ == '__main__':
@@ -174,11 +255,19 @@ if __name__ == '__main__': @@ -174,11 +255,19 @@ if __name__ == '__main__':
174 connection.close() 255 connection.close()
175 logging.info("数据库连接已关闭。") 256 logging.info("数据库连接已关闭。")
176 257
177 - # 设置定时任务,定期执行爬虫脚本 258 + # 设置定时任务,动态执行爬虫脚本
178 scheduler = BackgroundScheduler(timezone=utc) # 创建后台任务调度器 259 scheduler = BackgroundScheduler(timezone=utc) # 创建后台任务调度器
179 - scheduler.add_job(run_script, 'interval', hours=5) # 每5小时执行一次爬虫脚本  
180 scheduler.start() # 启动调度器 260 scheduler.start() # 启动调度器
181 - 261 +
  262 + # 初始化调度:如果数据库为空,立即爬取;否则,按照基础时间间隔安排首次爬取
  263 + if check_database_empty():
  264 + logging.info("数据库为空。立即开始初始爬取。")
  265 + dynamic_crawl()
  266 + else:
  267 + logging.info("数据库已有数据。安排首次爬取。")
  268 + base_interval = 5 * 60 * 60 # 5小时
  269 + scheduler.add_job(dynamic_crawl, 'date', run_date=datetime.now() + timedelta(seconds=base_interval), id='dynamic_crawl')
  270 +
182 try: 271 try:
183 app.run() # 启动 Flask 应用 272 app.run() # 启动 Flask 应用
184 finally: 273 finally: