wordCloudPicture.py
6.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import jieba
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import pymysql
from utils.logger import app_logger as logging
# Global cache for stop words
STOP_WORDS = set()
def load_stop_words():
"""
Load and cache stop words.
If the stop words file does not exist or fails to read, log an error and return an empty set.
"""
global STOP_WORDS
if STOP_WORDS:
return STOP_WORDS
stop_words_path = './model/stopWords.txt'
if not os.path.exists(stop_words_path):
logging.error(f"Stop words file does not exist: {stop_words_path}")
return set()
try:
with open(stop_words_path, encoding='utf8') as f:
STOP_WORDS = set(line.strip() for line in f if line.strip())
logging.info(f"Loaded {len(STOP_WORDS)} stop words")
except Exception as e:
logging.error(f"Failed to load stop words file: {e}")
return STOP_WORDS
def generate_word_cloud(text, mask_path, font_path, output_path):
"""
Generate a word cloud and save it to output_path.
:param text: Processed text
:param mask_path: Path to the mask image
:param font_path: Path to the font file
:param output_path: Path to save the generated word cloud image
"""
if not os.path.exists(mask_path):
logging.error(f"Mask image file does not exist: {mask_path}")
return
try:
img = Image.open(mask_path)
img_arr = np.array(img)
logging.info(f"Successfully loaded mask image: {mask_path}")
except Exception as e:
logging.error(f"Failed to load mask image: {e}")
return
try:
wc = WordCloud(
background_color="#fff",
mask=img_arr,
font_path=font_path,
max_words=2000,
max_font_size=100,
random_state=42,
width=800,
height=600
)
wc.generate_from_text(text)
logging.info("Word cloud generated successfully")
except Exception as e:
logging.error(f"Failed to generate word cloud: {e}")
return
try:
plt.figure(figsize=(8, 6))
plt.imshow(wc, interpolation='bilinear')
plt.axis('off')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
plt.close()
logging.info(f"Word cloud saved to: {output_path}")
except Exception as e:
logging.error(f"Failed to save word cloud image: {e}")
def get_db_connection_interactive():
"""
Interactively obtain database connection parameters from the terminal.
Press Enter to use default values.
:return: pymysql.connections.Connection object
"""
print("Please enter database connection information (press Enter to use default values):")
host = input(" 1. Host (default: localhost): ") or "localhost"
port_str = input(" 2. Port (default: 3306): ") or "3306"
try:
port = int(port_str)
except ValueError:
logging.error(f"Invalid port number: {port_str}")
port = 3306
user = input(" 3. Username (default: root): ") or "root"
password = input(" 4. Password (default: 12345678): ") or "12345678"
db_name = input(" 5. Database name (default: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"
logging.info(f"Attempting to connect to database: {user}@{host}:{port}/{db_name}")
try:
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=db_name,
port=port,
charset='utf8mb4'
)
logging.info("Database connection successful")
return connection
except pymysql.MySQLError as e:
logging.error(f"Database connection failed: {e}")
raise
def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
"""
Retrieve text data from a specified field and table in the database,
perform word segmentation and stop word removal, then generate a word cloud.
:param field: Database field name
:param table_name: Database table name
:param target_img_src: Path to the mask image
:param res_img_src: Path to save the generated word cloud image
:param connection: Established database connection
:param font_path: Path to the font file
"""
try:
with connection.cursor() as cursor:
sql = f'SELECT {field} FROM {table_name}'
cursor.execute(sql)
data = cursor.fetchall()
logging.info(f"Fetched {len(data)} records from '{table_name}' table, field '{field}'")
except pymysql.MySQLError as e:
logging.error(f"Database query failed: {e}")
return
text = ''.join(item[0] for item in data if item[0])
# Tokenization & Stop word removal
try:
stop_words = load_stop_words()
if not stop_words:
logging.warning("Stop words set is empty, proceeding without stop word removal")
cut_words = jieba.cut(text)
filtered_words = [word for word in cut_words if word not in stop_words]
final_text = ' '.join(filtered_words)
logging.info(f"Completed tokenization and stop word removal, generated {len(filtered_words)} words")
except Exception as e:
logging.error(f"Text processing failed: {e}")
return
# Generate word cloud
generate_word_cloud(final_text, target_img_src, font_path, res_img_src)
def main():
"""
Main function to execute the word cloud generation process.
"""
try:
# Obtain database connection interactively
connection = get_db_connection_interactive()
except Exception:
logging.error("Failed to establish database connection, terminating program")
return
try:
# Generate word cloud as per requirements
# Example: Generate word cloud from 'content' field in 'article' table
get_img(
field='content',
table_name='article',
target_img_src='./static/content.jpg',
res_img_src='./static/contentCloud.jpg',
connection=connection
)
print("Word cloud generation completed!")
except Exception as e:
logging.error(f"An error occurred during word cloud generation: {e}")
finally:
# Close the database connection
try:
connection.close()
logging.info("Database connection closed")
except Exception as e:
logging.error(f"Error closing database connection: {e}")
if __name__ == '__main__':
main()