MHA.py
2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
class MultiHeadAttentionLayer(nn.Module):
def __init__(self, embed_size, num_heads, dropout_rate=0.1):
super(MultiHeadAttentionLayer, self).__init__()
self.embed_size = embed_size
self.num_heads = num_heads
self.head_dim = embed_size // num_heads
assert (self.head_dim * num_heads == embed_size), "Embedding size needs to be divisible by num_heads"
# 定义线性变换层,分别用于 Q, K, V
self.q_linear = nn.Linear(embed_size, embed_size)
self.k_linear = nn.Linear(embed_size, embed_size)
self.v_linear = nn.Linear(embed_size, embed_size)
# 最终的线性层
self.fc_out = nn.Linear(embed_size, embed_size)
# 增加 Dropout 和 LayerNorm
self.dropout = nn.Dropout(p=dropout_rate)
self.layer_norm = nn.LayerNorm(embed_size)
def forward(self, values, keys, query, mask=None):
N = query.shape[0] # batch_size
# 将输入变换为 Q, K, V
Q = self.q_linear(query) # shape: (N, seq_len, embed_size)
K = self.k_linear(keys) # shape: (N, seq_len, embed_size)
V = self.v_linear(values) # shape: (N, seq_len, embed_size)
# 将 Q, K, V 分成多个头
Q = Q.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
K = K.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
V = V.reshape(N, -1, self.num_heads, self.head_dim) # shape: (N, seq_len, num_heads, head_dim)
# 计算缩放点积注意力
attention_scores = torch.einsum("nqhd,nkhd->nhqk", [Q, K]) # (N, num_heads, seq_len_q, seq_len_k)
attention_scores = attention_scores / (self.head_dim ** (1 / 2)) # 缩放
if mask is not None:
attention_scores = attention_scores.masked_fill(mask == 0, float("-1e20"))
attention = torch.softmax(attention_scores, dim=-1) # 归一化
# 根据注意力分布加权 V
out = torch.einsum("nhql,nlhd->nqhd", [attention, V]) # (N, num_heads, seq_len_q, head_dim)
out = out.reshape(N, -1, self.embed_size) # 将多头输出拼接回原始嵌入大小
# 通过线性层
out = self.fc_out(out)
# 使用残差连接并应用 LayerNorm
out = self.layer_norm(out + query)
# 应用 Dropout
out = self.dropout(out)
return out