BERT_CTM.py 3.63 KB
import os
from transformers.models.bert import BertTokenizer, BertModel
import torch
from tqdm import tqdm
import numpy as np
import jieba
from contextualized_topic_models.utils.data_preparation import TopicModelDataPreparation
from contextualized_topic_models.models.ctm import CombinedTM

class BERT_CTM_Model:
    def __init__(self, bert_model_path, ctm_tokenizer_path, n_components=12, num_epochs=50, device=None):
        # 确定设备 (CPU/GPU)
        self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
        
        # 检查模型路径是否存在
        if not os.path.exists(bert_model_path):
            raise ValueError(f"BERT模型路径不存在: {bert_model_path}")
        if not os.path.exists(ctm_tokenizer_path):
            raise ValueError(f"CTM分词器路径不存在: {ctm_tokenizer_path}")

        # 加载BERT模型和tokenizer
        self.tokenizer = BertTokenizer.from_pretrained(bert_model_path)
        self.model = BertModel.from_pretrained(bert_model_path).to(self.device)

        # 创建CTM数据预处理对象
        self.tp = TopicModelDataPreparation(ctm_tokenizer_path)
        self.n_components = n_components
        self.num_epochs = num_epochs
        self.ctm_model = None

    def get_bert_embeddings(self, texts):
        """使用BERT模型批量生成文本的嵌入向量"""
        embeddings = []
        for text in tqdm(texts, desc="Processing texts with BERT"):
            inputs = self.tokenizer(text, return_tensors="pt", padding="max_length", truncation=True, max_length=80).to(self.device)
            with torch.no_grad():
                outputs = self.model(**inputs)
            embeddings.append(outputs.last_hidden_state.cpu().numpy())  # [batch_size, sequence_length, hidden_size]
        return np.vstack(embeddings)

    def chinese_tokenize(self, text):
        """使用jieba对中文文本进行分词"""
        return " ".join(jieba.cut(text))

    def train_ctm(self, texts):
        """训练CTM模型"""
        try:
            # 分词并准备BOW文本
            bow_texts = [self.chinese_tokenize(text) for text in texts]
            training_dataset = self.tp.fit(text_for_contextual=texts, text_for_bow=bow_texts)

            # 训练CTM
            self.ctm_model = CombinedTM(bow_size=len(self.tp.vocab), contextual_size=768, 
                                        n_components=self.n_components, num_epochs=self.num_epochs)
            self.ctm_model.fit(training_dataset)
            print("CTM模型训练完成")
        except Exception as e:
            print(f"训练CTM模型时发生错误: {e}")

    def save_model(self, path):
        """保存训练后的CTM模型"""
        if self.ctm_model:
            self.ctm_model.save(path)
            print(f"CTM模型已保存至: {path}")
        else:
            print("未找到已训练的CTM模型,无法保存")

    def load_model(self, path):
        """加载已保存的CTM模型"""
        if os.path.exists(path):
            self.ctm_model = CombinedTM.load(path)
            print(f"CTM模型已加载自: {path}")
        else:
            print(f"无法加载模型,路径不存在: {path}")

if __name__ == "__main__":
    # 设定BERT和CTM模型的路径
    bert_model_path = './bert_model'
    ctm_tokenizer_path = './sentence_bert_model'
    
    # 初始化模型
    model = BERT_CTM_Model(bert_model_path, ctm_tokenizer_path)

    # 示例文本
    texts = ["这是第一个文本", "这是第二个文本"]
    
    # 训练CTM模型
    model.train_ctm(texts)

    # 保存CTM模型
    model.save_model('./trained_ctm_model')

    # 加载CTM模型
    model.load_model('./trained_ctm_model')