戒酒的李白
Committed by GitHub

Merge pull request #14 from tianjy12/main

Removed unnecessary introduction and optimized the word cloud generation component. Added dynamic scheduling functionality to adjust crawler execution intervals based on crawl duration and data volume.
### 项目中的大部分UI元素使用该模板搭建
模板链接:https://iqonic.design/product/admin-templates/datum-crm-admin-deshboard-template/
\ No newline at end of file
2025-01-09 23:29:06,246 [INFO] 尝试连接到数据库: root@localhost:3306/Weibo_PublicOpinion_AnalysisSystem
2025-01-09 23:29:06,346 [ERROR] 数据库连接失败: (1045, "Access denied for user 'root'@'localhost' (using password: YES)")
... ... @@ -7,6 +7,8 @@ import subprocess
from flask import Flask, session, request, redirect, render_template
from apscheduler.schedulers.background import BackgroundScheduler
from pytz import utc
from datetime import datetime, timedelta
import time
# 初始化日志记录
logging.basicConfig(
... ... @@ -153,11 +155,90 @@ def run_script():
# 执行所有脚本
for script_name, script_path in scripts:
try:
print(f"Running {script_name}...") # 打印运行开始的信息
logging.info(f"Running {script_name}...")
subprocess.run(['python', script_path], check=True) # 使用 subprocess 执行脚本
print(f"{script_name} finished successfully.") # 打印脚本成功完成的消息
logging.info(f"{script_name} finished successfully.")
except subprocess.CalledProcessError as e:
print(f"An error occurred while running {script_name}: {e}") # 打印错误信息
logging.error(f"An error occurred while running {script_name}: {e}")
# 新增功能:动态调度爬虫脚本
def check_database_empty():
"""
检查数据库中的指定表是否为空。
:return: 如果表为空则返回 True,否则返回 False
"""
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) as count FROM article")
result = cursor.fetchone()
count = result['count'] if result and 'count' in result else 0
logging.info(f"数据库中共有 {count} 条记录。")
return count == 0
except pymysql.MySQLError as e:
logging.error(f"检查数据库失败: {e}")
return True # 连接失败时假设数据库为空,以防止阻塞
finally:
if 'connection' in locals():
connection.close()
def dynamic_crawl():
"""
执行爬取任务并根据爬取耗时和获取的数据量动态调度下次爬取时间。
"""
try:
start_time = time.time()
logging.info("开始爬取数据。")
run_script() # 执行爬虫脚本
end_time = time.time()
duration = end_time - start_time # 爬取耗时
# 获取爬取后数据库中记录的数量作为数据量
try:
connection = pymysql.connect(**DB_CONFIG)
with connection.cursor() as cursor:
cursor.execute("SELECT COUNT(*) as count FROM article")
result = cursor.fetchone()
data_fetched = result['count'] if result and 'count' in result else 0
logging.info(f"爬取完成,耗时 {duration:.2f} 秒,数据库中共有 {data_fetched} 条记录。")
except pymysql.MySQLError as e:
logging.error(f"获取数据量失败: {e}")
data_fetched = 0
finally:
if 'connection' in locals():
connection.close()
# 根据爬取耗时和数据量调整下次爬取时间
base_interval = 5 * 60 * 60 # 5小时的基础时间间隔(秒)
if duration > 3600: # 爬取耗时超过1小时
next_interval = base_interval + duration
logging.info(f"检测到长时间爬取。下次爬取将在 {next_interval/3600:.2f} 小时后执行。")
elif data_fetched < 50: # 获取的数据量少于50条
next_interval = base_interval / 2
logging.info(f"获取数据量较少。下次爬取将在 {next_interval/60:.2f} 分钟后执行。")
else:
next_interval = base_interval
logging.info(f"标准爬取完成。下次爬取将在 {next_interval/3600:.2f} 小时后执行。")
# 安排下次爬取任务
scheduler.add_job(dynamic_crawl, 'date', run_date=datetime.now() + timedelta(seconds=next_interval), id='dynamic_crawl')
except Exception as e:
logging.error(f"动态爬取过程中发生错误: {e}")
# 数据库配置,用于动态调度功能
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': '12345678',
'database': 'Weibo_PublicOpinion_AnalysisSystem',
'port': 3306,
'charset': 'utf8mb4'
}
# 主程序入口
if __name__ == '__main__':
... ... @@ -174,11 +255,19 @@ if __name__ == '__main__':
connection.close()
logging.info("数据库连接已关闭。")
# 设置定时任务,定期执行爬虫脚本
# 设置定时任务,动态执行爬虫脚本
scheduler = BackgroundScheduler(timezone=utc) # 创建后台任务调度器
scheduler.add_job(run_script, 'interval', hours=5) # 每5小时执行一次爬虫脚本
scheduler.start() # 启动调度器
# 初始化调度:如果数据库为空,立即爬取;否则,按照基础时间间隔安排首次爬取
if check_database_empty():
logging.info("数据库为空。立即开始初始爬取。")
dynamic_crawl()
else:
logging.info("数据库已有数据。安排首次爬取。")
base_interval = 5 * 60 * 60 # 5小时
scheduler.add_job(dynamic_crawl, 'date', run_date=datetime.now() + timedelta(seconds=base_interval), id='dynamic_crawl')
try:
app.run() # 启动 Flask 应用
finally:
... ...
... ... @@ -5,95 +5,180 @@ import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import pymysql
def stopWordList():
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("wordcloud_generator.log"),
logging.StreamHandler()
]
)
# Global cache for stop words
STOP_WORDS = set()
def load_stop_words():
"""
如果 stopWords.txt 文件内容较大,或被频繁读取,
可以考虑将其缓存起来,避免重复读文件。
Load and cache stop words.
If the stop words file does not exist or fails to read, log an error and return an empty set.
"""
with open('./model/stopWords.txt', encoding='utf8') as f:
return [line.strip() for line in f.readlines()]
global STOP_WORDS
if STOP_WORDS:
return STOP_WORDS
stop_words_path = './model/stopWords.txt'
if not os.path.exists(stop_words_path):
logging.error(f"Stop words file does not exist: {stop_words_path}")
return set()
try:
with open(stop_words_path, encoding='utf8') as f:
STOP_WORDS = set(line.strip() for line in f if line.strip())
logging.info(f"Loaded {len(STOP_WORDS)} stop words")
except Exception as e:
logging.error(f"Failed to load stop words file: {e}")
return STOP_WORDS
def generate_word_cloud(text, mask_path, font_path, output_path):
"""生成词云并保存到 output_path"""
img = Image.open(mask_path)
img_arr = np.array(img)
wc = WordCloud(
background_color="#fff",
mask=img_arr,
font_path=font_path
)
wc.generate_from_text(text)
plt.figure(figsize=(8, 6))
plt.imshow(wc, interpolation='bilinear')
plt.axis('off')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close() # 保存后关闭
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
Generate a word cloud and save it to output_path.
:param text: Processed text
:param mask_path: Path to the mask image
:param font_path: Path to the font file
:param output_path: Path to save the generated word cloud image
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
if not os.path.exists(mask_path):
logging.error(f"Mask image file does not exist: {mask_path}")
return
try:
img = Image.open(mask_path)
img_arr = np.array(img)
logging.info(f"Successfully loaded mask image: {mask_path}")
except Exception as e:
logging.error(f"Failed to load mask image: {e}")
return
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
port = int(port_str)
try:
wc = WordCloud(
background_color="#fff",
mask=img_arr,
font_path=font_path,
max_words=2000,
max_font_size=100,
random_state=42,
width=800,
height=600
)
wc.generate_from_text(text)
logging.info("Word cloud generated successfully")
except Exception as e:
logging.error(f"Failed to generate word cloud: {e}")
return
user = input(" 3. 用户名 (默认: root): ") or "root"
password = input(" 4. 密码 (默认: 312517): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
try:
plt.figure(figsize=(8, 6))
plt.imshow(wc, interpolation='bilinear')
plt.axis('off')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
logging.info(f"Word cloud saved to: {output_path}")
except Exception as e:
logging.error(f"Failed to save word cloud image: {e}")
print(f"\n即将连接到数据库: {user}@{host}:{port}/{db_name}\n")
def get_db_connection_interactive():
"""
Interactively obtain database connection parameters from the terminal.
Press Enter to use default values.
return pymysql.connect(
host=host,
user=user,
password=password,
database=db_name,
port=port,
charset='utf8mb4'
)
def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
"""
从数据库拉取指定字段的文本数据,分词处理后生成词云。
:param field: 数据库字段名
:param table_name: 数据表名
:param target_img_src: 词云形状图
:param res_img_src: 输出词云文件路径
:param connection: 已建立的数据库连接
:param font_path: 字体文件路径
:return: pymysql.connections.Connection object
"""
cursor = connection.cursor()
sql = f'SELECT {field} FROM {table_name}'
cursor.execute(sql)
data = cursor.fetchall()
print("Please enter database connection information (press Enter to use default values):")
text = ''
for item in data:
text += item[0] # item 是元组 (内容,),取第一个元素即可
host = input(" 1. Host (default: localhost): ") or "localhost"
port_str = input(" 2. Port (default: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.error(f"Invalid port number: {port_str}")
port = 3306
user = input(" 3. Username (default: root): ") or "root"
password = input(" 4. Password (default: 12345678): ") or "12345678"
db_name = input(" 5. Database name (default: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
cursor.close()
logging.info(f"Attempting to connect to database: {user}@{host}:{port}/{db_name}")
# 分词 & 去停用词
cut_words = jieba.cut(text)
stop_words = set(stopWordList())
filtered_words = [word for word in cut_words if word not in stop_words]
final_text = ' '.join(filtered_words)
try:
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=db_name,
port=port,
charset='utf8mb4'
)
logging.info("Database connection successful")
return connection
except pymysql.MySQLError as e:
logging.error(f"Database connection failed: {e}")
raise
# 生成词云
def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
"""
Retrieve text data from a specified field and table in the database,
perform word segmentation and stop word removal, then generate a word cloud.
:param field: Database field name
:param table_name: Database table name
:param target_img_src: Path to the mask image
:param res_img_src: Path to save the generated word cloud image
:param connection: Established database connection
:param font_path: Path to the font file
"""
try:
with connection.cursor() as cursor:
sql = f'SELECT {field} FROM {table_name}'
cursor.execute(sql)
data = cursor.fetchall()
logging.info(f"Fetched {len(data)} records from '{table_name}' table, field '{field}'")
except pymysql.MySQLError as e:
logging.error(f"Database query failed: {e}")
return
text = ''.join(item[0] for item in data if item[0])
# Tokenization & Stop word removal
try:
stop_words = load_stop_words()
if not stop_words:
logging.warning("Stop words set is empty, proceeding without stop word removal")
cut_words = jieba.cut(text)
filtered_words = [word for word in cut_words if word not in stop_words]
final_text = ' '.join(filtered_words)
logging.info(f"Completed tokenization and stop word removal, generated {len(filtered_words)} words")
except Exception as e:
logging.error(f"Text processing failed: {e}")
return
# Generate word cloud
generate_word_cloud(final_text, target_img_src, font_path, res_img_src)
def main():
# 1. 获取数据库连接(交互式输入)
connection = get_db_connection_interactive()
"""
Main function to execute the word cloud generation process.
"""
try:
# Obtain database connection interactively
connection = get_db_connection_interactive()
except Exception:
logging.error("Failed to establish database connection, terminating program")
return
# 2. 根据需求生成词云
# 例如:从 article 表的 content 字段生成词云
try:
# Generate word cloud as per requirements
# Example: Generate word cloud from 'content' field in 'article' table
get_img(
field='content',
table_name='article',
... ... @@ -101,10 +186,16 @@ def main():
res_img_src='./static/contentCloud.jpg',
connection=connection
)
print("词云生成完毕!")
print("Word cloud generation completed!")
except Exception as e:
logging.error(f"An error occurred during word cloud generation: {e}")
finally:
# 关闭数据库连接
connection.close()
# Close the database connection
try:
connection.close()
logging.info("Database connection closed")
except Exception as e:
logging.error(f"Error closing database connection: {e}")
if __name__ == '__main__':
main()
... ...