sensitive_filter.py
11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import re
import json
import os
import logging
from pathlib import Path
logger = logging.getLogger('sensitive_filter')
logger.setLevel(logging.INFO)
class SensitiveDataFilter:
"""
敏感数据过滤器 - 用于检测和屏蔽输出内容中的敏感信息
功能:
1. 自动识别并过滤手机号、邮箱、身份证号、信用卡号等敏感信息
2. 支持自定义敏感信息模式和替换文本
3. 提供批量处理和实时过滤功能
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(SensitiveDataFilter, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 默认配置
self.config = {
'enabled': os.getenv('ENABLE_SENSITIVE_DATA_FILTER', 'true').lower() == 'true',
'patterns': {
'phone': r'\b1[3-9]\d{9}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'id_card': r'\b[1-9]\d{5}(19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b',
'credit_card': r'\b\d{4}[ -]?\d{4}[ -]?\d{4}[ -]?\d{4}\b',
'address': r'(北京|上海|广州|深圳|天津|重庆|南京|杭州|武汉|成都|西安)市.*?(路|街|道|巷).*?(号)'
},
'replacements': {
'phone': '***********',
'email': '******@*****',
'id_card': '******************',
'credit_card': '****************',
'address': '[地址已隐藏]'
}
}
# 加载自定义配置
self._load_config()
# 编译正则表达式
self._compile_patterns()
self._initialized = True
logger.info("敏感数据过滤器初始化完成")
if self.config['enabled']:
logger.info(f"已启用以下类型的敏感数据过滤: {', '.join(self.config['patterns'].keys())}")
else:
logger.info("敏感数据过滤器已禁用")
def _load_config(self):
"""加载自定义配置"""
# 配置文件路径
data_dir = os.getenv('DATA_DIR', 'data')
config_path = os.path.join(data_dir, 'security', 'sensitive_filter.json')
if os.path.exists(config_path):
try:
with open(config_path, 'r', encoding='utf-8') as f:
custom_config = json.load(f)
# 更新配置
if 'enabled' in custom_config:
self.config['enabled'] = custom_config['enabled']
if 'patterns' in custom_config:
for key, pattern in custom_config['patterns'].items():
self.config['patterns'][key] = pattern
if 'replacements' in custom_config:
for key, replacement in custom_config['replacements'].items():
self.config['replacements'][key] = replacement
logger.info(f"已加载自定义敏感数据过滤配置: {config_path}")
except Exception as e:
logger.error(f"加载敏感数据过滤配置失败: {e}")
def _compile_patterns(self):
"""编译正则表达式"""
self.compiled_patterns = {}
for key, pattern in self.config['patterns'].items():
try:
self.compiled_patterns[key] = re.compile(pattern)
logger.debug(f"已编译敏感数据模式: {key} - {pattern}")
except re.error as e:
logger.error(f"编译敏感数据模式失败: {key} - {pattern}: {e}")
def filter_text(self, text):
"""
过滤文本中的敏感信息
参数:
text: 要过滤的文本
返回:
过滤后的文本
"""
if not self.config['enabled'] or not text:
return text
filtered_text = text
for key, pattern in self.compiled_patterns.items():
replacement = self.config['replacements'].get(key, '[FILTERED]')
filtered_text = pattern.sub(replacement, filtered_text)
return filtered_text
def filter_dict(self, data, *skip_keys):
"""
过滤字典中的敏感信息
参数:
data: 要过滤的字典
skip_keys: 要跳过的键(不进行过滤)
返回:
过滤后的字典
"""
if not self.config['enabled'] or not data:
return data
if not isinstance(data, dict):
if isinstance(data, str):
return self.filter_text(data)
return data
filtered_data = {}
for key, value in data.items():
if key in skip_keys:
filtered_data[key] = value
continue
if isinstance(value, dict):
filtered_data[key] = self.filter_dict(value, *skip_keys)
elif isinstance(value, list):
filtered_data[key] = [
self.filter_dict(item, *skip_keys) if isinstance(item, (dict, list)) else
self.filter_text(item) if isinstance(item, str) else item
for item in value
]
elif isinstance(value, str):
filtered_data[key] = self.filter_text(value)
else:
filtered_data[key] = value
return filtered_data
def filter_list(self, data, *skip_keys):
"""
过滤列表中的敏感信息
参数:
data: 要过滤的列表
skip_keys: 如果列表项是字典,要跳过的键
返回:
过滤后的列表
"""
if not self.config['enabled'] or not data:
return data
if not isinstance(data, list):
if isinstance(data, dict):
return self.filter_dict(data, *skip_keys)
if isinstance(data, str):
return self.filter_text(data)
return data
return [
self.filter_dict(item, *skip_keys) if isinstance(item, dict) else
self.filter_list(item, *skip_keys) if isinstance(item, list) else
self.filter_text(item) if isinstance(item, str) else item
for item in data
]
def is_sensitive_info(self, text, info_type=None):
"""
检查文本是否包含敏感信息
参数:
text: 要检查的文本
info_type: 指定要检查的敏感信息类型,如果为None则检查所有类型
返回:
包含敏感信息返回True,否则返回False
"""
if not self.config['enabled'] or not text:
return False
if info_type:
if info_type not in self.compiled_patterns:
logger.warning(f"未知的敏感信息类型: {info_type}")
return False
return bool(self.compiled_patterns[info_type].search(text))
for pattern in self.compiled_patterns.values():
if pattern.search(text):
return True
return False
def get_sensitive_info_types(self, text):
"""
获取文本中包含的敏感信息类型
参数:
text: 要检查的文本
返回:
包含的敏感信息类型列表
"""
if not self.config['enabled'] or not text:
return []
types = []
for key, pattern in self.compiled_patterns.items():
if pattern.search(text):
types.append(key)
return types
def enable(self):
"""启用敏感数据过滤器"""
self.config['enabled'] = True
logger.info("敏感数据过滤器已启用")
def disable(self):
"""禁用敏感数据过滤器"""
self.config['enabled'] = False
logger.info("敏感数据过滤器已禁用")
def is_enabled(self):
"""检查敏感数据过滤器是否启用"""
return self.config['enabled']
def add_pattern(self, key, pattern, replacement='[FILTERED]'):
"""
添加自定义敏感信息模式
参数:
key: 敏感信息类型标识
pattern: 正则表达式字符串
replacement: 替换文本
"""
try:
# 测试是否是有效的正则表达式
re.compile(pattern)
# 更新配置
self.config['patterns'][key] = pattern
self.config['replacements'][key] = replacement
# 重新编译正则表达式
self._compile_patterns()
logger.info(f"已添加敏感信息模式: {key}")
return True
except re.error as e:
logger.error(f"添加敏感信息模式失败: {key} - {pattern}: {e}")
return False
def remove_pattern(self, key):
"""
移除敏感信息模式
参数:
key: 敏感信息类型标识
"""
if key in self.config['patterns']:
del self.config['patterns'][key]
if key in self.config['replacements']:
del self.config['replacements'][key]
if key in self.compiled_patterns:
del self.compiled_patterns[key]
logger.info(f"已移除敏感信息模式: {key}")
return True
logger.warning(f"未找到敏感信息模式: {key}")
return False
def save_config(self):
"""保存当前配置到文件"""
data_dir = os.getenv('DATA_DIR', 'data')
security_dir = os.path.join(data_dir, 'security')
os.makedirs(security_dir, exist_ok=True)
config_path = os.path.join(security_dir, 'sensitive_filter.json')
try:
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
logger.info(f"敏感数据过滤配置已保存到: {config_path}")
return True
except Exception as e:
logger.error(f"保存敏感数据过滤配置失败: {e}")
return False
# 创建全局敏感数据过滤器实例
sensitive_filter = SensitiveDataFilter()
# 提供便捷的过滤函数
def filter_text(text):
"""过滤文本中的敏感信息"""
return sensitive_filter.filter_text(text)
def filter_dict(data, *skip_keys):
"""过滤字典中的敏感信息"""
return sensitive_filter.filter_dict(data, *skip_keys)
def filter_list(data, *skip_keys):
"""过滤列表中的敏感信息"""
return sensitive_filter.filter_list(data, *skip_keys)
def is_sensitive_info(text, info_type=None):
"""检查文本是否包含敏感信息"""
return sensitive_filter.is_sensitive_info(text, info_type)
# 示例用法
if __name__ == "__main__":
# 测试文本
test_text = """
联系人: 张三
电话: 13812345678
邮箱: zhangsan@example.com
身份证: 110101199001011234
地址: 北京市海淀区中关村大街20号
信用卡: 6225 1234 5678 9012
"""
# 过滤敏感信息
filtered_text = filter_text(test_text)
print("原始文本:")
print(test_text)
print("\n过滤后:")
print(filtered_text)
# 检查敏感信息类型
types = sensitive_filter.get_sensitive_info_types(test_text)
print(f"\n包含的敏感信息类型: {types}")