predict.py
8.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import torch
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm
import os
import sys
import json
import chardet
# 导入改进版模型的组件
from model_pro.MHA import MultiHeadAttentionLayer
from model_pro.classifier import FinalClassifier
from model_pro.BERT_CTM import BERT_CTM_Model
class ModelManager:
_instance = None
_initialized = False
def __new__(cls):
if cls._instance is None:
cls._instance = super(ModelManager, cls).__new__(cls)
return cls._instance
def __init__(self):
if not self._initialized:
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.classifier_model = None
self.attention_model = None
self.bert_ctm_model = None
self._initialized = True
def load_models(self, model_save_path, bert_model_path, ctm_tokenizer_path):
"""加载所有需要的模型"""
try:
if self.classifier_model is None:
self.classifier_model = torch.load(model_save_path, map_location=self.device)
self.classifier_model.eval()
if self.attention_model is None:
self.attention_model = MultiHeadAttentionLayer(embed_size=768, num_heads=8)
self.attention_model.to(self.device)
self.attention_model.eval()
if self.bert_ctm_model is None:
self.bert_ctm_model = BERT_CTM_Model(
bert_model_path=bert_model_path,
ctm_tokenizer_path=ctm_tokenizer_path
)
return True
except Exception as e:
print(f"模型加载失败: {e}")
return False
def predict_batch(self, texts, batch_size=32):
"""批量预测文本情感"""
try:
all_predictions = []
all_probabilities = []
# 分批处理文本
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
# 获取文本嵌入
embeddings = self.bert_ctm_model.get_bert_embeddings(batch_texts)
# 转换为tensor
batch_x = torch.tensor(embeddings, dtype=torch.float32).to(self.device)
batch_x = torch.mean(batch_x, dim=1)
with torch.no_grad():
# 使用注意力机制
attention_output = self.attention_model(batch_x, batch_x, batch_x)
# 获取分类结果
outputs = self.classifier_model(attention_output)
outputs = torch.mean(outputs, dim=1)
# 获取预测概率
probabilities = torch.softmax(outputs, dim=1)
# 获取预测标签
_, predicted = torch.max(outputs, 1)
all_predictions.extend(predicted.cpu().numpy())
all_probabilities.extend(probabilities.cpu().numpy())
return all_predictions, all_probabilities
except Exception as e:
print(f"预测过程中出现错误: {e}")
return None, None
# 创建全局的模型管理器实例
model_manager = ModelManager()
def detect_file_encoding(file_path, num_bytes=10000):
"""
使用 chardet 检测文件的编码。
:param file_path: 文件路径
:param num_bytes: 用于检测的字节数
:return: 检测到的编码
"""
with open(file_path, 'rb') as f:
rawdata = f.read(num_bytes)
result = chardet.detect(rawdata)
encoding = result['encoding']
confidence = result['confidence']
print(f"检测到的编码: {encoding}, 置信度: {confidence}")
return encoding
def get_bert_ctm_embeddings(texts, bert_model_path, ctm_tokenizer_path, n_components=12, num_epochs=20):
# 创建BERT_CTM_Model实例
bert_ctm_model = BERT_CTM_Model(
bert_model_path=bert_model_path,
ctm_tokenizer_path=ctm_tokenizer_path,
n_components=n_components,
num_epochs=num_epochs
)
# 获取嵌入
embeddings = bert_ctm_model.get_bert_embeddings(texts)
return embeddings
def prepare_dataloader(features, batch_size):
tensor_x = torch.tensor(features, dtype=torch.float32)
dataset = TensorDataset(tensor_x)
return DataLoader(dataset, batch_size=batch_size, shuffle=False)
def predict(model_save_path, input_data_path, output_path, bert_model_path, ctm_tokenizer_path, stats_output_path,
batch_size=128,
num_classes=2):
try:
# 加载模型
print("加载模型...")
if not model_manager.load_models(model_save_path, bert_model_path, ctm_tokenizer_path):
return False
# 检测文件编码
encoding = detect_file_encoding(input_data_path)
# 读取输入数据
print("读取输入数据...")
data = pd.read_csv(input_data_path, encoding=encoding)
texts = data['TEXT'].tolist()
# 生成嵌入
print("生成文本嵌入...")
embeddings = get_bert_ctm_embeddings(texts, bert_model_path, ctm_tokenizer_path)
# 准备DataLoader
data_loader = prepare_dataloader(embeddings, batch_size)
# 存储预测结果
all_predictions = []
all_probabilities = []
print("开始预测...")
with torch.no_grad():
for batch in tqdm(data_loader, desc="预测进度"):
batch_x = batch[0].to(model_manager.device)
batch_x = torch.mean(batch_x, dim=1)
# 使用注意力机制
attention_output = model_manager.attention_model(batch_x, batch_x, batch_x)
# 获取分类结果
outputs = model_manager.classifier_model(attention_output)
outputs = torch.mean(outputs, dim=1)
# 获取预测概率
probabilities = torch.softmax(outputs, dim=1)
# 获取预测标签
_, predicted = torch.max(outputs, 1)
all_predictions.extend(predicted.cpu().numpy())
all_probabilities.extend(probabilities.cpu().numpy())
# 添加预测结果和概率到数据框
data['Predicted_Label'] = all_predictions
data['Confidence'] = [prob[pred] for prob, pred in zip(all_probabilities, all_predictions)]
# 保存预测结果
data.to_csv(output_path, index=False, encoding='utf-8')
print(f"预测结果已保存到 {output_path}")
# 统计标签的个数和占比
label_counts = data['Predicted_Label'].value_counts()
total_count = len(data)
stats = {
'统计信息': {
'总样本数': total_count,
'各类别统计': {}
}
}
for label, count in label_counts.items():
label_name = "良好" if label == 0 else "不良"
percentage = (count / total_count) * 100
confidence_mean = data[data['Predicted_Label'] == label]['Confidence'].mean()
stats['统计信息']['各类别统计'][label_name] = {
'数量': int(count),
'占比': f"{percentage:.2f}%",
'平均置信度': f"{confidence_mean:.2f}"
}
print(f"标签: {label_name}, 数量: {count}, 占比: {percentage:.2f}%, 平均置信度: {confidence_mean:.2f}")
# 将统计信息保存到 JSON 文件
with open(stats_output_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
print(f"预测过程中出现错误: {e}")
return False
if __name__ == "__main__":
if len(sys.argv) != 3:
print("使用方法: python predict.py <input_data_path> <stats_output_path>")
sys.exit(1)
input_data_path = sys.argv[1]
stats_output_path = sys.argv[2]
# 定义路径
model_save_path = 'model_pro/final_model.pt'
output_path = 'model_pro/predictions.csv'
bert_model_path = 'model_pro/bert_model'
ctm_tokenizer_path = 'model_pro/sentence_bert_model'
# 执行预测
success = predict(model_save_path, input_data_path, output_path, bert_model_path, ctm_tokenizer_path,
stats_output_path)
if success:
sys.exit(0)
else:
sys.exit(1)