본 내용은 https://arxiv.org/abs/1706.03762 논문을 기반으로 하고
https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice에서 Transformer 구현 코드를 참고하였습니다.
혹시 잘못된 부분이나 수정할 부분이 있다면 댓글로 알려주시면 감사하겠습니다.
Text 전처리와 Encoder 구현은 저번 글에서 알아보았으니 이젠 Decoder 부분을 구현하고 전체 Transformer를 이용해 학습해보는 것을 알아보겠다. (Encoder 부분은 이전 글 참고)
디코더
디코더는 인코더와 다르게 두 개의 attention 층을 가지고 있다.
Masked Multi-head self-attention : timestep마다 지난 출력과 예측한 현재 토큰만 사용하여 토큰을 생성
Encoder-decoder attention : 디코더의 중간 표현을 Query로 사용해서 인코더에서 나온 Key와 Value vector에 Multi-head attention을 수행
Decoder Layer Architecture
class DecoderLayer(nn.Module):
def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
super().__init__()
self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
self.enc_attn_layer_norm = nn.LayerNorm(hidden_dim)
self.ff_layer_norm = nn.LayerNorm(hidden_dim)
self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
self.encoder_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
self.dropout = nn.Dropout(dropout_ratio)
# 인코더의 출력 값(enc_src)을 어텐션(attention)하는 구조
def forward(self, trg, enc_src, trg_mask, src_mask):
# trg: [batch_size, trg_len, hidden_dim]
# enc_src: [batch_size, src_len, hidden_dim]
# trg_mask: [batch_size, trg_len]
# src_mask: [batch_size, src_len]
# self attention
# 자기 자신에 대하여 어텐션(attention)
_trg, _ = self.self_attention(trg, trg, trg, trg_mask)
# dropout, residual connection and layer norm
trg = self.self_attn_layer_norm(trg + self.dropout(_trg))
# trg: [batch_size, trg_len, hidden_dim]
# encoder attention
# 디코더의 쿼리(Query)를 이용해 인코더를 어텐션(attention)
_trg, attention = self.encoder_attention(trg, enc_src, enc_src, src_mask)
# dropout, residual connection and layer norm
trg = self.enc_attn_layer_norm(trg + self.dropout(_trg))
# trg: [batch_size, trg_len, hidden_dim]
# positionwise feedforward
_trg = self.positionwise_feedforward(trg)
# dropout, residual and layer norm
trg = self.ff_layer_norm(trg + self.dropout(_trg))
# trg: [batch_size, trg_len, hidden_dim]
# attention: [batch_size, n_heads, trg_len, src_len]
return trg, attention
최종 Transformer Decoder
class Decoder(nn.Module):
def __init__(self, output_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
super().__init__()
self.device = device
self.tok_embedding = nn.Embedding(output_dim, hidden_dim)
self.pos_embedding = nn.Embedding(max_length, hidden_dim)
self.layers = nn.ModuleList([DecoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])
self.fc_out = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout_ratio)
self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)
def forward(self, trg, enc_src, trg_mask, src_mask):
# trg: [batch_size, trg_len]
# enc_src: [batch_size, src_len, hidden_dim]
# trg_mask: [batch_size, trg_len]
# src_mask: [batch_size, src_len]
batch_size = trg.shape[0]
trg_len = trg.shape[1]
pos = torch.arange(0, trg_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
# pos: [batch_size, trg_len]
trg = self.dropout((self.tok_embedding(trg) * self.scale) + self.pos_embedding(pos))
# trg: [batch_size, trg_len, hidden_dim]
for layer in self.layers:
# 소스 마스크와 타겟 마스크 모두 사용
trg, attention = layer(trg, enc_src, trg_mask, src_mask)
# trg: [batch_size, trg_len, hidden_dim]
# attention: [batch_size, n_heads, trg_len, src_len]
output = self.fc_out(trg)
# output: [batch_size, trg_len, output_dim]
return output, attention
최종 Transformer
class Transformer(nn.Module):
def __init__(self, encoder, decoder, src_pad_idx, trg_pad_idx, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_pad_idx = src_pad_idx
self.trg_pad_idx = trg_pad_idx
self.device = device
# 소스 문장의 <pad> 토큰에 대하여 마스크(mask) 값을 0으로 설정
def make_src_mask(self, src):
# src: [batch_size, src_len]
src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
# src_mask: [batch_size, 1, 1, src_len]
return src_mask
# 타겟 문장에서 각 단어는 다음 단어가 무엇인지 알 수 없도록(이전 단어만 보도록) 만들기 위해 마스크를 사용
def make_trg_mask(self, trg):
# trg: [batch_size, trg_len]
""" (마스크 예시)
1 0 0 0 0
1 1 0 0 0
1 1 1 0 0
1 1 1 0 0
1 1 1 0 0
"""
trg_pad_mask = (trg != self.trg_pad_idx).unsqueeze(1).unsqueeze(2)
# trg_pad_mask: [batch_size, 1, 1, trg_len]
trg_len = trg.shape[1]
""" (마스크 예시)
1 0 0 0 0
1 1 0 0 0
1 1 1 0 0
1 1 1 1 0
1 1 1 1 1
"""
trg_sub_mask = torch.tril(torch.ones((trg_len, trg_len), device = self.device)).bool()
# trg_sub_mask: [trg_len, trg_len]
trg_mask = trg_pad_mask & trg_sub_mask
# trg_mask: [batch_size, 1, trg_len, trg_len]
return trg_mask
def forward(self, src, trg):
# src: [batch_size, src_len]
# trg: [batch_size, trg_len]
src_mask = self.make_src_mask(src)
trg_mask = self.make_trg_mask(trg)
# src_mask: [batch_size, 1, 1, src_len]
# trg_mask: [batch_size, 1, trg_len, trg_len]
enc_src = self.encoder(src, src_mask)
# enc_src: [batch_size, src_len, hidden_dim]
output, attention = self.decoder(trg, enc_src, trg_mask, src_mask)
# output: [batch_size, trg_len, output_dim]
# attention: [batch_size, n_heads, trg_len, src_len]
return output, attention
학습(Training)
Hyperparameter
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
HIDDEN_DIM = 256
ENC_LAYERS = 3
DEC_LAYERS = 3
ENC_HEADS = 8
DEC_HEADS = 8
ENC_PF_DIM = 512
DEC_PF_DIM = 512
ENC_DROPOUT = 0.1
DEC_DROPOUT = 0.1
SRC_PAD_IDX = SRC.vocab.stoi[SRC.pad_token]
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
# 인코더(encoder)와 디코더(decoder) 객체 선언
enc = Encoder(INPUT_DIM, HIDDEN_DIM, ENC_LAYERS, ENC_HEADS, ENC_PF_DIM, ENC_DROPOUT, device)
dec = Decoder(OUTPUT_DIM, HIDDEN_DIM, DEC_LAYERS, DEC_HEADS, DEC_PF_DIM, DEC_DROPOUT, device)
# Transformer 객체 선언
model = Transformer(enc, dec, SRC_PAD_IDX, TRG_PAD_IDX, device).to(device)
Train 함수, Eval 함수 설정
import torch.optim as optim
# Adam optimizer로 학습 최적화
LEARINING_RATE = 0.0005
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
# 뒷 부분의 패딩(padding)에 대해서는 값 무시
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)
# 모델 학습(train) 함수
def train(model, iterator, optimizer, criterion, clip):
model.train() # 학습 모드
epoch_loss = 0
# 전체 학습 데이터를 확인하며
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
optimizer.zero_grad()
# 출력 단어의 마지막 인덱스(<eos>)는 제외
# 입력을 할 때는 <sos>부터 시작하도록 처리
output, _ = model(src, trg[:,:-1])
# output: [배치 크기, trg_len - 1, output_dim]
# trg: [배치 크기, trg_len]
output_dim = output.shape[-1]
output = output.contiguous().view(-1, output_dim)
# 출력 단어의 인덱스 0(<sos>)은 제외
trg = trg[:,1:].contiguous().view(-1)
# output: [배치 크기 * trg_len - 1, output_dim]
# trg: [배치 크기 * trg len - 1]
# 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
loss = criterion(output, trg)
loss.backward() # 기울기(gradient) 계산
# 기울기(gradient) clipping 진행
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
# 파라미터 업데이트
optimizer.step()
# 전체 손실 값 계산
epoch_loss += loss.item()
return epoch_loss / len(iterator)
# 모델 평가(evaluate) 함수
def evaluate(model, iterator, criterion):
model.eval() # 평가 모드
epoch_loss = 0
with torch.no_grad():
# 전체 평가 데이터를 확인하며
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
# 출력 단어의 마지막 인덱스(<eos>)는 제외
# 입력을 할 때는 <sos>부터 시작하도록 처리
output, _ = model(src, trg[:,:-1])
# output: [배치 크기, trg_len - 1, output_dim]
# trg: [배치 크기, trg_len]
output_dim = output.shape[-1]
output = output.contiguous().view(-1, output_dim)
# 출력 단어의 인덱스 0(<sos>)은 제외
trg = trg[:,1:].contiguous().view(-1)
# output: [배치 크기 * trg_len - 1, output_dim]
# trg: [배치 크기 * trg len - 1]
# 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
loss = criterion(output, trg)
# 전체 손실 값 계산
epoch_loss += loss.item()
return epoch_loss / len(iterator)
학습 진행
import math
import time
import random
def epoch_time(start_time, end_time):
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time() # 시작 시간 기록
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iterator, criterion)
end_time = time.time() # 종료 시간 기록
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'transformer_german_to_english.pt')
print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')
'Papers with Code > Natural Language Processing' 카테고리의 다른 글
[Paper Review Code] Attention Is All You Need (1) (0) | 2024.03.10 |
---|---|
Sequence to Sequence Learning with Neural Networks (0) | 2024.01.04 |