You need to sign in or sign up before continuing.
wordCloudPicture.py 6.51 KB
import os
import jieba
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from PIL import Image
import numpy as np
import pymysql
from utils.logger import app_logger as logging

# Global cache for stop words
STOP_WORDS = set()

def load_stop_words():
    """
    Load and cache stop words.
    If the stop words file does not exist or fails to read, log an error and return an empty set.
    """
    global STOP_WORDS
    if STOP_WORDS:
        return STOP_WORDS
    stop_words_path = './model/stopWords.txt'
    if not os.path.exists(stop_words_path):
        logging.error(f"Stop words file does not exist: {stop_words_path}")
        return set()
    try:
        with open(stop_words_path, encoding='utf8') as f:
            STOP_WORDS = set(line.strip() for line in f if line.strip())
        logging.info(f"Loaded {len(STOP_WORDS)} stop words")
    except Exception as e:
        logging.error(f"Failed to load stop words file: {e}")
    return STOP_WORDS

def generate_word_cloud(text, mask_path, font_path, output_path):
    """
    Generate a word cloud and save it to output_path.
    
    :param text: Processed text
    :param mask_path: Path to the mask image
    :param font_path: Path to the font file
    :param output_path: Path to save the generated word cloud image
    """
    if not os.path.exists(mask_path):
        logging.error(f"Mask image file does not exist: {mask_path}")
        return
    try:
        img = Image.open(mask_path)
        img_arr = np.array(img)
        logging.info(f"Successfully loaded mask image: {mask_path}")
    except Exception as e:
        logging.error(f"Failed to load mask image: {e}")
        return

    try:
        wc = WordCloud(
            background_color="#fff",
            mask=img_arr,
            font_path=font_path,
            max_words=2000,
            max_font_size=100,
            random_state=42,
            width=800,
            height=600
        )
        wc.generate_from_text(text)
        logging.info("Word cloud generated successfully")
    except Exception as e:
        logging.error(f"Failed to generate word cloud: {e}")
        return

    try:
        plt.figure(figsize=(8, 6))
        plt.imshow(wc, interpolation='bilinear')
        plt.axis('off')
        plt.savefig(output_path, dpi=300, bbox_inches='tight')
        plt.close()
        logging.info(f"Word cloud saved to: {output_path}")
    except Exception as e:
        logging.error(f"Failed to save word cloud image: {e}")

def get_db_connection_interactive():
    """
    Interactively obtain database connection parameters from the terminal.
    Press Enter to use default values.
    
    :return: pymysql.connections.Connection object
    """
    print("Please enter database connection information (press Enter to use default values):")

    host = input(" 1. Host (default: localhost): ") or "localhost"
    port_str = input(" 2. Port (default: 3306): ") or "3306"
    try:
        port = int(port_str)
    except ValueError:
        logging.error(f"Invalid port number: {port_str}")
        port = 3306

    user = input(" 3. Username (default: root): ") or "root"
    password = input(" 4. Password (default: 12345678): ") or "12345678"
    db_name = input(" 5. Database name (default: Weibo_PublicOpinion_AnalysisSystem): ") or "Weibo_PublicOpinion_AnalysisSystem"

    logging.info(f"Attempting to connect to database: {user}@{host}:{port}/{db_name}")

    try:
        connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=db_name,
            port=port,
            charset='utf8mb4'
        )
        logging.info("Database connection successful")
        return connection
    except pymysql.MySQLError as e:
        logging.error(f"Database connection failed: {e}")
        raise

def get_img(field, table_name, target_img_src, res_img_src, connection, font_path='STHUPO.TTF'):
    """
    Retrieve text data from a specified field and table in the database,
    perform word segmentation and stop word removal, then generate a word cloud.
    
    :param field: Database field name
    :param table_name: Database table name
    :param target_img_src: Path to the mask image
    :param res_img_src: Path to save the generated word cloud image
    :param connection: Established database connection
    :param font_path: Path to the font file
    """
    try:
        with connection.cursor() as cursor:
            sql = f'SELECT {field} FROM {table_name}'
            cursor.execute(sql)
            data = cursor.fetchall()
        logging.info(f"Fetched {len(data)} records from '{table_name}' table, field '{field}'")
    except pymysql.MySQLError as e:
        logging.error(f"Database query failed: {e}")
        return

    text = ''.join(item[0] for item in data if item[0])

    # Tokenization & Stop word removal
    try:
        stop_words = load_stop_words()
        if not stop_words:
            logging.warning("Stop words set is empty, proceeding without stop word removal")
        cut_words = jieba.cut(text)
        filtered_words = [word for word in cut_words if word not in stop_words]
        final_text = ' '.join(filtered_words)
        logging.info(f"Completed tokenization and stop word removal, generated {len(filtered_words)} words")
    except Exception as e:
        logging.error(f"Text processing failed: {e}")
        return

    # Generate word cloud
    generate_word_cloud(final_text, target_img_src, font_path, res_img_src)

def main():
    """
    Main function to execute the word cloud generation process.
    """
    try:
        # Obtain database connection interactively
        connection = get_db_connection_interactive()
    except Exception:
        logging.error("Failed to establish database connection, terminating program")
        return

    try:
        # Generate word cloud as per requirements
        # Example: Generate word cloud from 'content' field in 'article' table
        get_img(
            field='content', 
            table_name='article', 
            target_img_src='./static/content.jpg', 
            res_img_src='./static/contentCloud.jpg', 
            connection=connection
        )
        print("Word cloud generation completed!")
    except Exception as e:
        logging.error(f"An error occurred during word cloud generation: {e}")
    finally:
        # Close the database connection
        try:
            connection.close()
            logging.info("Database connection closed")
        except Exception as e:
            logging.error(f"Error closing database connection: {e}")

if __name__ == '__main__':
    main()