You need to sign in or sign up before continuing.
LSTM_model.py 14.7 KB
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import jieba
from transformers import BertTokenizer
import logging
import os

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('LSTM_model')

class TextDataset(Dataset):
    """文本数据集类,用于加载和预处理文本数据"""
    
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length
        
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]
        
        # BERT分词并获得输入ID和注意力掩码
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'text': text,
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(label, dtype=torch.long)
        }

class LSTMSentimentModel(nn.Module):
    """基于LSTM的情感分析模型"""
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=2, 
                 bidirectional=True, dropout=0.5, pad_idx=0):
        super().__init__()
        
        # 嵌入层
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        
        # LSTM层
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout if n_layers > 1 else 0,
            batch_first=True
        )
        
        # 全连接层,如果是双向LSTM,输入维度需要翻倍
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        # Dropout层
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, attention_mask=None):
        # 文本通过嵌入层 [batch_size, seq_len] -> [batch_size, seq_len, embedding_dim]
        embedded = self.embedding(text)
        
        # 应用dropout
        embedded = self.dropout(embedded)
        
        # 通过LSTM [batch_size, seq_len, embedding_dim] -> [batch_size, seq_len, hidden_dim*2]
        if attention_mask is not None:
            # 创建打包的序列
            lengths = attention_mask.sum(dim=1).to('cpu')
            packed_embedded = nn.utils.rnn.pack_padded_sequence(
                embedded, lengths, batch_first=True, enforce_sorted=False
            )
            packed_output, (hidden, cell) = self.lstm(packed_embedded)
            # 解包序列
            output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        else:
            output, (hidden, cell) = self.lstm(embedded)
        
        # 如果是双向LSTM,需要拼接最后一层的前向和后向隐藏状态
        if self.lstm.bidirectional:
            hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
        else:
            hidden = hidden[-1]
        
        # 应用dropout
        hidden = self.dropout(hidden)
        
        # 全连接层
        return self.fc(hidden)

class LSTMModelManager:
    """LSTM模型管理类,用于训练、评估和预测"""
    
    def __init__(self, bert_model_path, model_save_path=None, vocab_size=30522, 
                 embedding_dim=128, hidden_dim=256, output_dim=2, n_layers=2, 
                 bidirectional=True, dropout=0.5):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tokenizer = BertTokenizer.from_pretrained(bert_model_path)
        self.vocab_size = vocab_size
        self.model = LSTMSentimentModel(
            vocab_size=vocab_size,
            embedding_dim=embedding_dim,
            hidden_dim=hidden_dim,
            output_dim=output_dim,
            n_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout,
            pad_idx=self.tokenizer.pad_token_id
        ).to(self.device)
        
        self.model_save_path = model_save_path
        if model_save_path and os.path.exists(model_save_path):
            self.model.load_state_dict(torch.load(model_save_path, map_location=self.device))
            logger.info(f"已从 {model_save_path} 加载模型")
    
    def train(self, train_texts, train_labels, val_texts=None, val_labels=None, 
              batch_size=32, learning_rate=2e-5, epochs=10, validation_split=0.2):
        """训练模型"""
        logger.info("开始训练模型...")
        
        # 如果没有提供验证集,从训练集中划分
        if val_texts is None or val_labels is None:
            train_texts, val_texts, train_labels, val_labels = train_test_split(
                train_texts, train_labels, test_size=validation_split, random_state=42
            )
        
        # 创建数据集和数据加载器
        train_dataset = TextDataset(train_texts, train_labels, self.tokenizer)
        val_dataset = TextDataset(val_texts, val_labels, self.tokenizer)
        
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        val_dataloader = DataLoader(val_dataset, batch_size=batch_size)
        
        # 优化器和损失函数
        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        criterion = nn.CrossEntropyLoss()
        
        # 训练循环
        best_val_loss = float('inf')
        for epoch in range(epochs):
            # 训练模式
            self.model.train()
            train_loss = 0
            train_preds = []
            train_labels_list = []
            
            for batch in train_dataloader:
                # 获取数据
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['label'].to(self.device)
                
                # 前向传播
                optimizer.zero_grad()
                outputs = self.model(input_ids, attention_mask)
                
                # 计算损失
                loss = criterion(outputs, labels)
                train_loss += loss.item()
                
                # 反向传播
                loss.backward()
                optimizer.step()
                
                # 收集预测和标签
                _, predicted = torch.max(outputs, 1)
                train_preds.extend(predicted.cpu().numpy())
                train_labels_list.extend(labels.cpu().numpy())
            
            # 计算训练集的评估指标
            train_accuracy = accuracy_score(train_labels_list, train_preds)
            train_f1 = f1_score(train_labels_list, train_preds, average='macro')
            
            # 验证模式
            self.model.eval()
            val_loss = 0
            val_preds = []
            val_labels_list = []
            
            with torch.no_grad():
                for batch in val_dataloader:
                    input_ids = batch['input_ids'].to(self.device)
                    attention_mask = batch['attention_mask'].to(self.device)
                    labels = batch['label'].to(self.device)
                    
                    outputs = self.model(input_ids, attention_mask)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    
                    _, predicted = torch.max(outputs, 1)
                    val_preds.extend(predicted.cpu().numpy())
                    val_labels_list.extend(labels.cpu().numpy())
            
            # 计算验证集的评估指标
            val_accuracy = accuracy_score(val_labels_list, val_preds)
            val_f1 = f1_score(val_labels_list, val_preds, average='macro')
            
            # 计算平均损失
            train_loss /= len(train_dataloader)
            val_loss /= len(val_dataloader)
            
            logger.info(f'Epoch {epoch+1}/{epochs} | '
                        f'Train Loss: {train_loss:.4f} | '
                        f'Train Acc: {train_accuracy:.4f} | '
                        f'Train F1: {train_f1:.4f} | '
                        f'Val Loss: {val_loss:.4f} | '
                        f'Val Acc: {val_accuracy:.4f} | '
                        f'Val F1: {val_f1:.4f}')
            
            # 保存最佳模型
            if val_loss < best_val_loss and self.model_save_path:
                best_val_loss = val_loss
                torch.save(self.model.state_dict(), self.model_save_path)
                logger.info(f"模型已保存到 {self.model_save_path}")
        
        # 如果有保存路径但没有保存过模型,保存最后一轮的模型
        if self.model_save_path and best_val_loss == float('inf'):
            torch.save(self.model.state_dict(), self.model_save_path)
            logger.info(f"最终模型已保存到 {self.model_save_path}")
        
        return train_loss, val_loss, val_accuracy, val_f1
    
    def evaluate(self, test_texts, test_labels, batch_size=32):
        """评估模型"""
        logger.info("评估模型...")
        
        # 创建测试数据集和数据加载器
        test_dataset = TextDataset(test_texts, test_labels, self.tokenizer)
        test_dataloader = DataLoader(test_dataset, batch_size=batch_size)
        
        # 设置为评估模式
        self.model.eval()
        
        # 损失函数
        criterion = nn.CrossEntropyLoss()
        test_loss = 0
        test_preds = []
        test_probs = []
        test_labels_list = []
        
        with torch.no_grad():
            for batch in test_dataloader:
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                labels = batch['label'].to(self.device)
                
                outputs = self.model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                test_loss += loss.item()
                
                probs = torch.softmax(outputs, dim=1)
                _, predicted = torch.max(outputs, 1)
                
                test_preds.extend(predicted.cpu().numpy())
                test_probs.extend(probs.cpu().numpy())
                test_labels_list.extend(labels.cpu().numpy())
        
        # 计算平均损失
        test_loss /= len(test_dataloader)
        
        # 计算评估指标
        accuracy = accuracy_score(test_labels_list, test_preds)
        precision = precision_score(test_labels_list, test_preds, average='macro')
        recall = recall_score(test_labels_list, test_preds, average='macro')
        f1 = f1_score(test_labels_list, test_preds, average='macro')
        conf_matrix = confusion_matrix(test_labels_list, test_preds)
        
        logger.info(f'Test Loss: {test_loss:.4f}')
        logger.info(f'Accuracy: {accuracy:.4f}')
        logger.info(f'Precision: {precision:.4f}')
        logger.info(f'Recall: {recall:.4f}')
        logger.info(f'F1 Score: {f1:.4f}')
        logger.info(f'Confusion Matrix:\n{conf_matrix}')
        
        return {
            'loss': test_loss,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': conf_matrix,
            'predictions': test_preds,
            'probabilities': test_probs
        }
    
    def predict_batch(self, texts, batch_size=32):
        """批量预测文本的情感"""
        if not texts:
            return None, None
            
        # 确保文本是列表格式
        if isinstance(texts, str):
            texts = [texts]
        
        # 创建数据集(没有标签,使用占位符)
        dummy_labels = [0] * len(texts)
        dataset = TextDataset(texts, dummy_labels, self.tokenizer)
        dataloader = DataLoader(dataset, batch_size=batch_size)
        
        # 设置为评估模式
        self.model.eval()
        
        all_preds = []
        all_probs = []
        
        with torch.no_grad():
            for batch in dataloader:
                input_ids = batch['input_ids'].to(self.device)
                attention_mask = batch['attention_mask'].to(self.device)
                
                outputs = self.model(input_ids, attention_mask)
                probs = torch.softmax(outputs, dim=1)
                _, predicted = torch.max(outputs, 1)
                
                all_preds.extend(predicted.cpu().numpy())
                all_probs.extend(probs.cpu().numpy())
        
        return all_preds, all_probs
    
    def predict(self, text):
        """预测单个文本的情感"""
        predictions, probabilities = self.predict_batch([text])
        if predictions is not None and len(predictions) > 0:
            return predictions[0], probabilities[0]
        return None, None

# 创建全局模型实例
lstm_model_manager = LSTMModelManager(
    bert_model_path='model_pro/bert_model',
    model_save_path='model_pro/lstm_model.pt'
)

# 测试代码
if __name__ == "__main__":
    # 加载数据
    train_data = pd.read_csv('model_pro/train.csv')
    dev_data = pd.read_csv('model_pro/dev.csv')
    test_data = pd.read_csv('model_pro/test.csv')
    
    # 处理数据
    train_texts = train_data['text'].values
    train_labels = train_data['label'].values
    
    dev_texts = dev_data['text'].values
    dev_labels = dev_data['label'].values
    
    test_texts = test_data['text'].values
    test_labels = test_data['label'].values
    
    # 训练模型
    lstm_model_manager.train(
        train_texts, train_labels,
        val_texts=dev_texts, val_labels=dev_labels,
        batch_size=32, epochs=5
    )
    
    # 评估模型
    results = lstm_model_manager.evaluate(test_texts, test_labels)
    
    # 测试预测功能
    test_sentences = [
        "这件事情做得非常好",
        "服务太差了,态度恶劣",
        "这个产品质量一般,但价格便宜",
        "我对这家公司非常满意",
    ]
    
    for sentence in test_sentences:
        pred, prob = lstm_model_manager.predict(sentence)
        label = '良好' if pred == 0 else '不良'
        confidence = prob[pred]
        print(f"句子: '{sentence}' 预测结果: {label} (置信度: {confidence:.2%})")