戒酒的李白

Modify the database hardcoding to switch to command-line interactive database connection.

... ... @@ -12,3 +12,4 @@ model2/*
*.pyz
*.pywz
.vscode
.VSCodeCounter
\ No newline at end of file
... ...
import os
from sqlalchemy import create_engine
import pandas as pd
from spiderDataPackage.settings import articleAddr,commentsAddr
# from ..model.topicDefine import *
from sqlalchemy import create_engine
from getpass import getpass
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("save_data.log"),
logging.StreamHandler()
]
)
engine = create_engine('mysql+pymysql://XiaoXueQi:XiaoXueQi@47.92.235.6/Weibo_PublicOpinion_AnalysisSystem?charset=utf8mb4')
# 假设 articleAddr 和 commentsAddr 是绝对路径或相对于脚本的路径
from spiderDataPackage.settings import articleAddr, commentsAddr
def saveData():
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回 SQLAlchemy 的数据库引擎。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
try:
oldArticle = pd.read_sql('select * from article',engine)
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass(" 4. 密码 (默认: 12345678): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
# 构建数据库连接字符串
connection_str = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db_name}?charset=utf8mb4"
try:
engine = create_engine(connection_str)
# 测试连接
with engine.connect() as connection:
logging.info(f"成功连接到数据库: {user}@{host}:{port}/{db_name}")
return engine
except Exception as e:
logging.error(f"无法连接到数据库: {e}")
exit(1)
def saveData(engine):
"""
从数据库和CSV文件读取数据,合并后去重并保存回数据库。
最后删除CSV文件。
"""
try:
# 读取旧数据
oldArticle = pd.read_sql('SELECT * FROM article', engine)
oldComment = pd.read_sql('SELECT * FROM comments', engine)
logging.info("成功从数据库读取旧的文章和评论数据。")
# 读取新数据
newArticle = pd.read_csv(articleAddr)
oldComment = pd.read_sql('select * from comments',engine)
newComment = pd.read_csv(commentsAddr)
logging.info("成功从CSV文件读取新的文章和评论数据。")
mergeArticle = pd.concat([newArticle,oldArticle],join='inner')
mergeComment = pd.concat([newComment,oldComment],join='inner')
# 合并数据
mergeArticle = pd.concat([newArticle, oldArticle], ignore_index=True, sort=False)
mergeComment = pd.concat([newComment, oldComment], ignore_index=True, sort=False)
logging.info("成功合并新旧文章和评论数据。")
mergeArticle.drop_duplicates(subset='id',keep='last',inplace=True)
mergeComment.drop_duplicates(subset='content',keep='last',inplace=True)
# 去重
mergeArticle.drop_duplicates(subset='id', keep='last', inplace=True)
mergeComment.drop_duplicates(subset='content', keep='last', inplace=True)
logging.info("成功去除重复的文章和评论数据。")
# 保存回数据库
mergeArticle.to_sql('article', con=engine, if_exists='replace', index=False)
mergeComment.to_sql('comments', con=engine, if_exists='replace', index=False)
except:
newArticle = pd.read_csv(articleAddr)
newComment = pd.read_csv(commentsAddr)
newArticle.to_sql('article',con=engine,if_exists='replace',index=False)
newComment.to_sql('comments',con=engine,if_exists='replace',index=False)
logging.info("成功将合并后的数据保存回数据库。")
except pd.errors.EmptyDataError as e:
logging.error(f"读取CSV文件时出错: {e}")
except Exception as e:
logging.error(f"保存数据时出错: {e}")
else:
# 删除CSV文件
try:
os.remove(articleAddr)
os.remove(commentsAddr)
# update_data()
logging.info("成功删除CSV文件。")
except Exception as e:
logging.warning(f"删除CSV文件时出错: {e}")
def main():
# 获取数据库连接
engine = get_db_connection_interactive()
# 保存数据
saveData(engine)
# 关闭引擎(可选,因为SQLAlchemy引擎会自动管理连接池)
engine.dispose()
logging.info("数据库连接已关闭。")
if __name__ == '__main__':
saveData()
\ No newline at end of file
main()
... ...
from pymysql import *
conn = connect(host='47.92.235.6',port=3306,user='XiaoXueQi',password='XiaoXueQi',database='Weibo_PublicOpinion_AnalysisSystem')
import getpass
import pymysql
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler("database_operations.log"),
logging.StreamHandler()
]
)
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
返回一个连接对象。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.warning("端口号无效,使用默认端口 3306。")
port = 3306
user = input(" 3. 用户名 (默认: root): ") or "root"
password = getpass.getpass(" 4. 密码 (默认: 312517): ") or "312517"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
logging.info(f"尝试连接到数据库: {user}@{host}:{port}/{db_name}")
try:
connection = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
database=db_name,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
logging.info("数据库连接成功。")
return connection
except pymysql.MySQLError as e:
logging.error(f"数据库连接失败: {e}")
exit(1)
# 获取数据库连接
conn = get_db_connection_interactive()
# 获取游标
cursor = conn.cursor()
def query(sql,params,type="no_select"):
def query(sql, params=None, query_type="no_select"):
"""
执行SQL查询或操作。
:param sql: SQL语句
:param params: SQL参数(可选)
:param query_type: 查询类型,默认为 "no_select"
如果不是 "no_select",则执行 fetch 操作
:return: 如果是查询操作,返回数据列表;否则返回 None
"""
try:
if params:
params = tuple(params)
cursor.execute(sql,params)
cursor.execute(sql, params)
else:
cursor.execute(sql)
# 确保连接保持活跃
conn.ping(reconnect=True)
if type != 'no_select':
if query_type != "no_select":
data_list = cursor.fetchall()
conn.commit()
logging.info("查询成功,已获取数据。")
return data_list
else:
conn.commit()
logging.info("操作成功,已提交事务。")
except pymysql.MySQLError as e:
logging.error(f"执行SQL时出错: {e}")
conn.rollback()
return None
def main():
# 示例用法
# 执行查询操作
select_sql = "SELECT * FROM article LIMIT 5"
articles = query(select_sql, query_type="select")
if articles:
for article in articles:
print(article)
# 执行插入操作(根据实际表结构修改)
insert_sql = "INSERT INTO article (id, content) VALUES (%s, %s)"
new_article = (12345, "这是一条新的文章内容。")
result = query(insert_sql, params=new_article, query_type="no_select")
if result is None:
logging.info("插入操作完成。")
# 关闭游标和连接
cursor.close()
conn.close()
logging.info("数据库连接已关闭。")
if __name__ == '__main__':
main()
... ...
import os
import jieba
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from PIL import Image,ImageDraw
from pymysql import *
import json
from PIL import Image
import numpy as np
def stopWordList():
return [line.strip() for line in open('./model/stopWords.txt',encoding='utf8').readlines()]
def get_img(field,tableName,targetImgSrc,resImgSrc):
con = connect(host='47.92.235.6',user='XiaoXueQi',password='XiaoXueQi',database='Weibo_PublicOpinion_AnalysisSystem',port=3306,charset='utf8mb4')
cuser = con.cursor()
sql = f'select {field} from {tableName}'
cuser.execute(sql)
data = cuser.fetchall()
text = ''
for item in data:
text += item[0]
cuser.close()
con.close()
import pymysql
cut = jieba.cut(text)
newCut = []
for word in cut:
if word not in stopWordList():newCut.append(word)
string = ' '.join(newCut)
def stopWordList():
"""
如果 stopWords.txt 文件内容较大,或被频繁读取,
可以考虑将其缓存起来,避免重复读文件。
"""
with open('./model/stopWords.txt', encoding='utf8') as f:
return [line.strip() for line in f.readlines()]
img = Image.open(targetImgSrc)
def generate_word_cloud(text, mask_path, font_path, output_path):
"""生成词云并保存到 output_path"""
img = Image.open(mask_path)
img_arr = np.array(img)
wc = WordCloud(
background_color="#fff",
mask=img_arr,
font_path='STHUPO.TTF'
font_path=font_path
)
wc.generate_from_text(string)
fig = plt.figure(1)
plt.imshow(wc)
wc.generate_from_text(text)
plt.figure(figsize=(8, 6))
plt.imshow(wc, interpolation='bilinear')
plt.axis('off')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close() # 保存后关闭
def get_db_connection_interactive():
"""
通过终端交互获取数据库连接参数,若按回车则使用默认值。
"""
print("请依次输入数据库连接信息(直接按回车使用默认值):")
host = input(" 1. 主机 (默认: localhost): ") or "localhost"
port_str = input(" 2. 端口 (默认: 3306): ") or "3306"
port = int(port_str)
user = input(" 3. 用户名 (默认: root): ") or "root"
password = input(" 4. 密码 (默认: 312517): ") or "12345678"
db_name = input(" 5. 数据库名 (默认: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
print(f"\n即将连接到数据库: {user}@{host}:{port}/{db_name}\n")
plt.savefig(resImgSrc,dpi=500)
return pymysql.connect(
host=host,
user=user,
password=password,
database=db_name,
port=port,
charset='utf8mb4'
)
def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
"""
从数据库拉取指定字段的文本数据,分词处理后生成词云。
:param field: 数据库字段名
:param table_name: 数据表名
:param target_img_src: 词云形状图
:param res_img_src: 输出词云文件路径
:param connection: 已建立的数据库连接
:param font_path: 字体文件路径
"""
cursor = connection.cursor()
sql = f'SELECT {field} FROM {table_name}'
cursor.execute(sql)
data = cursor.fetchall()
text = ''
for item in data:
text += item[0] # item 是元组 (内容,),取第一个元素即可
cursor.close()
# 分词 & 去停用词
cut_words = jieba.cut(text)
stop_words = set(stopWordList())
filtered_words = [word for word in cut_words if word not in stop_words]
final_text = ' '.join(filtered_words)
# 生成词云
generate_word_cloud(final_text, target_img_src, font_path, res_img_src)
def main():
# 1. 获取数据库连接(交互式输入)
connection = get_db_connection_interactive()
# 2. 根据需求生成词云
# 例如:从 article 表的 content 字段生成词云
try:
get_img(
field='content',
table_name='article',
target_img_src='./static/content.jpg',
res_img_src='./static/contentCloud.jpg',
connection=connection
)
print("词云生成完毕!")
finally:
# 关闭数据库连接
connection.close()
# get_img('content','comments','./static/comment.jpg','./static/commentCloud.jpg')
get_img('content','article','./static/content.jpg','./static/contentCloud.jpg')
if __name__ == '__main__':
main()
... ...