user.py
13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
import time
import hashlib
from flask import Blueprint, redirect, render_template, request, Flask, session, current_app, make_response
from datetime import datetime, timedelta
import re
from utils.query import query
from utils.errorResponse import errorResponse
from utils.logger import app_logger as logging
from functools import wraps
import secrets
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import redis
import json
import bleach
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
import html
# 创建Argon2密码哈希器
ph = PasswordHasher()
# Redis连接
redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 创建限流器
limiter = Limiter(
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
ub = Blueprint('user',
__name__,
url_prefix='/user',
template_folder='templates')
def sanitize_input(text):
"""清理用户输入,防止XSS攻击"""
if text is None:
return None
return bleach.clean(str(text), strip=True)
def validate_csrf_token():
"""验证CSRF令牌"""
token = request.form.get('csrf_token')
stored_token = session.get('csrf_token')
if not token or not stored_token or token != stored_token:
return False
return True
def get_client_info():
"""获取客户端信息"""
return {
'ip': request.remote_addr,
'user_agent': str(request.user_agent.string),
'platform': str(request.user_agent.platform),
'browser': str(request.user_agent.browser),
}
def is_suspicious_ip(ip):
"""检查IP是否可疑"""
key = f"login_attempts:{ip}"
attempts = redis_client.get(key)
if attempts and int(attempts) >= 5: # 5次失败尝试
return True
return False
def record_failed_attempt(ip):
"""记录失败的登录尝试"""
key = f"login_attempts:{ip}"
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, 1800) # 30分钟后重置
pipe.execute()
def clear_login_attempts(ip):
"""清除登录尝试记录"""
redis_client.delete(f"login_attempts:{ip}")
def set_secure_headers(response):
"""设置安全响应头"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['Content-Security-Policy'] = "default-src 'self'"
return response
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session:
return redirect('/user/login')
# 验证会话完整性
if 'client_info' not in session or 'session_id' not in session:
session.clear()
return redirect('/user/login')
# 验证客户端信息
current_client = get_client_info()
stored_client = session['client_info']
if (current_client['ip'] != stored_client['ip'] or
current_client['user_agent'] != stored_client['user_agent']):
session.clear()
return redirect('/user/login')
# 验证会话ID
stored_session_id = redis_client.get(f"session:{session['username']}")
if not stored_session_id or stored_session_id != session['session_id']:
session.clear()
return redirect('/user/login')
return f(*args, **kwargs)
return decorated_function
def hash_password(password: str) -> str:
"""
使用Argon2id算法哈希密码
:param password: 用户输入的密码
:return: 哈希后的密码
"""
return ph.hash(password)
def verify_password(stored_hash: str, password: str) -> bool:
"""
验证密码
:param stored_hash: 存储的密码哈希
:param password: 用户输入的密码
:return: 是否匹配
"""
try:
return ph.verify(stored_hash, password)
except VerifyMismatchError:
return False
def validate_password(password: str) -> bool:
"""
验证密码强度
"""
if len(password) < 12: # 增加最小长度要求
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False
# 检查常见密码模式
common_patterns = ['password', '123456', 'qwerty']
if any(pattern in password.lower() for pattern in common_patterns):
return False
return True
@ub.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute")
def login():
"""处理用户登录请求"""
if request.method == 'GET':
response = make_response(render_template('login_and_register.html'))
return set_secure_headers(response)
try:
if request.method == 'POST' and not validate_csrf_token():
logging.warning("CSRF验证失败")
return errorResponse('无效的请求')
client_ip = request.remote_addr
if is_suspicious_ip(client_ip):
logging.warning(f"可疑IP尝试登录: {client_ip}")
return errorResponse('由于多次失败尝试,请30分钟后再试')
username = sanitize_input(request.form.get('username'))
password = request.form.get('password') # 密码不需要sanitize
if not username or not password:
logging.warning("登录失败:用户名或密码为空")
return errorResponse('用户名和密码不能为空')
# 查询用户信息
sql = "SELECT password, status FROM user WHERE username = %s"
result = query(sql, [username], "select")
if result:
stored_password = result[0]['password']
status = result[0]['status']
if status != 'active':
logging.warning(f"已禁用的账户尝试登录: {username}")
return errorResponse('账户已被禁用')
if verify_password(stored_password, password):
session.clear()
session.regenerate()
# 生成唯一会话ID
session_id = secrets.token_hex(32)
client_info = get_client_info()
# 存储会话信息
session['username'] = username
session['login_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
session['csrf_token'] = secrets.token_hex(32)
session['client_info'] = client_info
session['session_id'] = session_id
session.permanent = True
current_app.permanent_session_lifetime = timedelta(hours=2)
# 在Redis中存储会话ID
redis_client.setex(
f"session:{username}",
int(current_app.permanent_session_lifetime.total_seconds()),
session_id
)
clear_login_attempts(client_ip)
# 记录登录历史
login_history_sql = '''
INSERT INTO login_history
(username, login_time, ip_address, user_agent, success, attempt_count)
VALUES (%s, %s, %s, %s, %s, %s)
'''
query(login_history_sql, [
username,
datetime.now(),
client_info['ip'],
client_info['user_agent'],
True,
redis_client.get(f"login_attempts:{client_ip}") or 0
])
logging.info(f"用户 {username} 登录成功")
response = make_response(redirect('/page/home'))
return set_secure_headers(response)
record_failed_attempt(client_ip)
logging.warning(f"登录失败:用户名或密码错误")
return errorResponse('用户名或密码错误')
except Exception as e:
logging.error(f"登录过程发生错误: {e}")
return errorResponse('登录失败,请稍后重试')
@ub.route('/register', methods=['GET', 'POST'])
@limiter.limit("3 per hour")
def register():
if request.method == 'GET':
response = make_response(render_template('login_and_register.html'))
return set_secure_headers(response)
try:
if request.method == 'POST' and not validate_csrf_token():
logging.warning("CSRF验证失败")
return errorResponse('无效的请求')
username = sanitize_input(request.form.get('username'))
password = request.form.get('password')
email = sanitize_input(request.form.get('email'))
if not username or not password or not email:
return errorResponse('用户名、密码和邮箱不能为空')
# 验证用户名格式
if not re.match(r'^[a-zA-Z0-9_]{4,20}$', username):
return errorResponse('用户名只能包含字母、数字和下划线,长度4-20位')
# 验证邮箱格式
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
return errorResponse('邮箱格式不正确')
# 验证密码强度
if not validate_password(password):
return errorResponse('密码必须包含大小写字母、数字和特殊字符,且长度至少12位')
try:
# 检查用户名和邮箱是否存在
check_sql = """
SELECT
(SELECT COUNT(*) FROM user WHERE LOWER(username) = LOWER(%s)) as username_count,
(SELECT COUNT(*) FROM user WHERE LOWER(email) = LOWER(%s)) as email_count
"""
result = query(check_sql, [username.lower(), email.lower()], "select")
if result[0]['username_count'] > 0:
return errorResponse('该用户名已被注册')
if result[0]['email_count'] > 0:
return errorResponse('该邮箱已被注册')
# 哈希密码
hashed_password = hash_password(password)
# 插入新用户
insert_sql = '''
INSERT INTO user(username, password, email, status, createTime, last_password_change)
VALUES(%s, %s, %s, %s, %s, %s)
'''
current_time = datetime.now()
query(insert_sql, [
username,
hashed_password,
email,
'active',
current_time,
current_time
])
# 记录注册信息
client_info = get_client_info()
register_history_sql = '''
INSERT INTO register_history
(username, register_time, ip_address, user_agent, email)
VALUES (%s, %s, %s, %s, %s)
'''
query(register_history_sql, [
username,
current_time,
client_info['ip'],
client_info['user_agent'],
email
])
logging.info(f"新用户注册成功: {username}")
response = make_response(redirect('/user/login'))
return set_secure_headers(response)
except Exception as e:
logging.error(f"注册过程发生错误: {e}")
return errorResponse('注册失败,请稍后重试')
except Exception as e:
logging.error(f"注册过程发生错误: {e}")
return errorResponse('注册失败,请稍后重试')
@ub.route('/logout')
@login_required
def logout():
"""用户登出"""
try:
username = session.get('username')
client_info = session.get('client_info', {})
# 记录登出历史
logout_history_sql = '''
INSERT INTO logout_history
(username, logout_time, ip_address, user_agent, session_id)
VALUES (%s, %s, %s, %s, %s)
'''
query(logout_history_sql, [
username,
datetime.now(),
client_info.get('ip'),
client_info.get('user_agent'),
session.get('session_id')
])
# 删除Redis中的会话
redis_client.delete(f"session:{username}")
session.clear()
logging.info(f"用户 {username} 成功登出")
response = make_response(redirect('/user/login'))
return set_secure_headers(response)
except Exception as e:
logging.error(f"登出过程发生错误: {e}")
response = make_response(redirect('/user/login'))
return set_secure_headers(response)