본 내용은 https://arxiv.org/abs/1409.3215 논문을 기반으로 구현한 코드입니다.
혹시 잘못된 부분이나 수정할 부분이 있다면 댓글로 알려주시면 감사하겠습니다.
데이터 전처리
spaCy 라이브러리 : 문장의 토큰화, 태깅 등 전처리 기능을 위한 라이브러리
!python -m spacy download en
!python -m spacy download de
import spacy
spacy_en = spacy.load('en_core_web_sm') # 영어 토큰화
spacy_de = spacy.load('de_core_news_sm') # 독일어 토큰화
- 토큰화 함수
# 독일어 문장을 토큰화한 후 순서를 뒤집는 함수
def tokenize_de(text):
return [token.text for token in spacy_de.tokenizer(text)][::-1]
# 영어 문장을 토큰화하는 함수
def tokenize_en(text):
return [token.text for token in spacy_en.tokenizer(text)]
Field 라이브러리를 이용해 데이터셋에 대한 전처리 내용을 명시
SRC : 독일어, TRC : 영어
from torchtext.data import Field, BucketIterator
SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True)
코드 구현 기준 시점에서는 라이브러리에서 데이터를 제공하지 않아 데이터를 다운받아 업로드 진행
from torchtext.datasets import Multi30k
train_data, valid_data, test_data = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG), root="/content/drive/MyDrive/Paper Review Code/data")
인코더 아키텍처
import torch.nn as nn
# 인코더 아키텍처 정의
class Encoder(nn.Module):
def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
super().__init__()
# 임베딩은 원-핫 인코딩을 특정 차원의 임베딩으로 매핑하는 레이어
self.embedding = nn.Embedding(input_dim, embed_dim)
# LSTM 레이어
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
# 드롭아웃
self.dropout = nn.Dropout(dropout_ratio)
# 인코더는 소스 문장을 입력으로 받아 context vector를 반환
def forward(self, src):
# src : [단어 개수, 배치 크기]
embedded = self.dropout(self.embedding(src))
# embedded : [단어 개수, 배치 크기, 임베딩 차원]
outputs, (hidden, cell) = self.rnn(embedded)
# outputs : [단어 개수, 배치 크기, 히든 차원] => 현재 단어의 출력 정보
# hidden : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어의 정보
# cell : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어의 정보
# context vector 반환
return hidden, cell
디코더 아키텍처
# 디코더 아키텍처 정의
class Decoder(nn.Module):
def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
super().__init__()
# 임베딩은 원-핫 인코딩 말고 특정 차원의 임베딩으로 매핑하는 레이어
self.embedding = nn.Embedding(output_dim, embed_dim)
# LSTM 레이어
self.hidden_dim = hidden_dim
self.n_layers = n_layers
self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
# FC 레이어 (인코더와 다른 부분)
self.output_dim = output_dim
self.fc_out = nn.Linear(hidden_dim, output_dim)
# 드롭아웃
self.dropout = nn.Dropout(dropout_ratio)
# 디코더는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장을 반환
def forward(self, input, hidden, cell):
# input : [배치 크기] => 단어의 개수는 항상 1개
# hidden : [레이어 개수, 배치 크기, 히든 차원]
# cell : [레이어 개수, 배치 크기, 히든 차원]
input = input.unsqueeze(0)
# input : [단어 개수 = 1, 배치 크기]
embedded = self.dropout(self.embedding(input))
# embedded : [단어 개수, 배치 크기, 임베딩 차원]
output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
# output : [단어 개수 = 1, 배치 크기, 히든 차원] => 현재 단어의 출력 정보
# hidden : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어 정보
# cell : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어 정보
# 단어 개수는 1개이므로 차원 제거
prediction = self.fc_out(output.squeeze(0))
# prediction = [배치 크기, 출력 차원]
# (현재 출력 단어, 현재까지의 모든 단어 정보, 현재까지의 모든 단어 정보)
return prediction, hidden, cell
Seq2Seq 아키텍처
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
# 학습할 때는 완전한 형태의 소스 문장, 타겟 문장, teacher_forcing_ratio를 넣기
def forward(self, src, trg, teacher_forcing_ratio=0.5):
# src : [단어 개수, 배치 크기]
# trg : [단어 개수, 배치 크기]
# 먼저 인코더를 거쳐 context vector를 추출
hidden, cell = self.encoder(src)
# 디코더의 최종 결과를 담을 텐서 객체 만들기
trg_len = trg.shape[0] # 단어 개수
batch_size = trg.shape[1] # 배치 크기
trg_vocab_size = self.decoder.output_dim # 출력 차원
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
# 첫 번째 입력은 항상 <sos> 토큰
input = trg[0, :]
# 타겟 단어의 개수만큼 반복하여 디코더에 포워딩(forwarding)
for t in range(1, trg_len):
output, hidden, cell = self.decoder(input, hidden, cell)
outputs[t] = output # FC를 거쳐서 나온 현재의 출력 단어 정보
top1 = output.argmax(1) # 가장 확률이 높은 단어의 인덱스 추출
# teacher_forcing_ratio : 학습할 때 실제 목표 출력(ground-truth)을 사용하는 비율
teacher_force = random.random() < teacher_forcing_ratio
input = trg[t] if teacher_force else top1 # 현재의 출력 결과를 다음 입력에서 넣기
return outputs
Train 학습
input_dim = len(SRC.vocab)
output_dim = len(TRG.vocab)
encoder_embed_dim = 256
decoder_embed_dim = 256
hidden_dim = 512
n_layers = 2
enc_dropout_ratio = 0.5
dec_dropout_ratio = 0.5
# 인코더와 디코더 객체 선언
enc = Encoder(input_dim, encoder_embed_dim, hidden_dim, n_layers, enc_dropout_ratio)
dec = Decoder(output_dim, decoder_embed_dim, hidden_dim, n_layers, dec_dropout_ratio)
# Seq2Seq 객체 선언
model = Seq2Seq(enc, dec, device).to(device)
# 모델 학습 함수
def train(model, iterator, optimizer, criterion, clip):
model.train()
epoch_loss = 0
# 전체 학습 데이터를 확인
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
optimizer.zero_grad()
output = model(src, trg)
# output : [출력 단어 개수, 배치 크기, 출력 차원]
ouput_dim = output.shape[-1]
# 출력 단어의 인덱스 0은 사용하지 않음
output = output[1:].view(-1, output_dim)
# output = [(출력 단어의 개수 - 1) * batch size, output dim]
trg = trg[1:].view(-1)
# trg = [(타겟 단어의 개수 - 1) * batch size]
# 모델의 출력 결과와 타겟 문장을 비교하여 손실 함수 계산
loss = criterion(output, trg)
loss.backward() # 기울기(gradient) 계산
# 기울기 clipping 진행
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
# 파라미터 업데이트
optimizer.step()
# 전체 손실 값 계산
epoch_loss += loss.item()
return epoch_loss / len(iterator)
# 모델 평가 함수
def evaluate(model, iterator, criterion):
model.eval() # 평가 모드
epoch_loss = 0
with torch.no_grad():
# 전체 평가 데이터를 확인하며
for i, batch in enumerate(iterator):
src = batch.src
trg = batch.trg
# 평가할 때 teacher forcing는 사용하지 않음
output = model(src, trg, 0)
# output: [출력 단어 개수, 배치 크기, 출력 차원]
output_dim = output.shape[-1]
# 출력 단어의 인덱스 0은 사용하지 않음
output = output[1:].view(-1, output_dim)
# output = [(출력 단어의 개수 - 1) * batch size, output dim]
trg = trg[1:].view(-1)
# trg = [(타겟 단어의 개수 - 1) * batch size]
# 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
loss = criterion(output, trg)
# 전체 손실 값 계산
epoch_loss += loss.item()
return epoch_loss / len(iterator)
import time
import math
import random
N_EPOCHS = 20
CLIP = 1
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
start_time = time.time() # 시작 시간 기록
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iterator, criterion)
end_time = time.time() # 종료 시간 기록
epoch_mins, epoch_secs = epoch_time(start_time, end_time)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'seq2seq.pt')
print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')
'Papers with Code > Natural Language Processing' 카테고리의 다른 글
[Paper Review Code] Attention Is All You Need (2) (0) | 2024.03.11 |
---|---|
[Paper Review Code] Attention Is All You Need (1) (0) | 2024.03.10 |