본문 바로가기
Papers with Code/Natural Language Processing

Sequence to Sequence Learning with Neural Networks

by BangGeuk 2024. 1. 4.

본 내용은 https://arxiv.org/abs/1409.3215 논문을 기반으로 구현한 코드입니다.

혹시 잘못된 부분이나 수정할 부분이 있다면 댓글로 알려주시면 감사하겠습니다.


데이터 전처리

spaCy 라이브러리 : 문장의 토큰화, 태깅 등 전처리 기능을 위한 라이브러리

!python -m spacy download en
!python -m spacy download de
import spacy

spacy_en = spacy.load('en_core_web_sm') # 영어 토큰화
spacy_de = spacy.load('de_core_news_sm') # 독일어 토큰화

 

- 토큰화 함수

# 독일어 문장을 토큰화한 후 순서를 뒤집는 함수
def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)][::-1]

# 영어 문장을 토큰화하는 함수
def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

 

Field 라이브러리를 이용해 데이터셋에 대한 전처리 내용을 명시

SRC : 독일어, TRC : 영어

from torchtext.data import Field, BucketIterator

SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True)

 

코드 구현 기준 시점에서는 라이브러리에서 데이터를 제공하지 않아 데이터를 다운받아 업로드 진행

from torchtext.datasets import Multi30k

train_data, valid_data, test_data = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG), root="/content/drive/MyDrive/Paper Review Code/data")

 

인코더 아키텍처

import torch.nn as nn

# 인코더 아키텍처 정의
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()

        # 임베딩은 원-핫 인코딩을 특정 차원의 임베딩으로 매핑하는 레이어
        self.embedding = nn.Embedding(input_dim, embed_dim)

        # LSTM 레이어
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)

        # 드롭아웃
        self.dropout = nn.Dropout(dropout_ratio)

    # 인코더는 소스 문장을 입력으로 받아 context vector를 반환
    def forward(self, src):
        # src : [단어 개수, 배치 크기]
        embedded = self.dropout(self.embedding(src))
        # embedded : [단어 개수, 배치 크기, 임베딩 차원]

        outputs, (hidden, cell) = self.rnn(embedded)
        # outputs : [단어 개수, 배치 크기, 히든 차원] => 현재 단어의 출력 정보
        # hidden : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어의 정보
        # cell : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어의 정보

        # context vector 반환
        return hidden, cell

 

디코더 아키텍처

# 디코더 아키텍처 정의
class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()

        # 임베딩은 원-핫 인코딩 말고 특정 차원의 임베딩으로 매핑하는 레이어
        self.embedding = nn.Embedding(output_dim, embed_dim)

        # LSTM 레이어
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)

        # FC 레이어 (인코더와 다른 부분)
        self.output_dim = output_dim
        self.fc_out = nn.Linear(hidden_dim, output_dim)

        # 드롭아웃
        self.dropout = nn.Dropout(dropout_ratio)

    # 디코더는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장을 반환
    def forward(self, input, hidden, cell):
        # input : [배치 크기] => 단어의 개수는 항상 1개
        # hidden : [레이어 개수, 배치 크기, 히든 차원]
        # cell : [레이어 개수, 배치 크기, 히든 차원]
        input = input.unsqueeze(0)
        # input : [단어 개수 = 1, 배치 크기]

        embedded = self.dropout(self.embedding(input))
        # embedded : [단어 개수, 배치 크기, 임베딩 차원]

        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        # output : [단어 개수 = 1, 배치 크기, 히든 차원] => 현재 단어의 출력 정보
        # hidden : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어 정보
        # cell : [레이어 개수, 배치 크기, 히든 차원] => 현재까지의 모든 단어 정보

        # 단어 개수는 1개이므로 차원 제거
        prediction = self.fc_out(output.squeeze(0))
        # prediction = [배치 크기, 출력 차원]

        # (현재 출력 단어, 현재까지의 모든 단어 정보, 현재까지의 모든 단어 정보)
        return prediction, hidden, cell

Seq2Seq 아키텍처

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    # 학습할 때는 완전한 형태의 소스 문장, 타겟 문장, teacher_forcing_ratio를 넣기
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        # src : [단어 개수, 배치 크기]
        # trg : [단어 개수, 배치 크기]
        # 먼저 인코더를 거쳐 context vector를 추출
        hidden, cell = self.encoder(src)

        # 디코더의 최종 결과를 담을 텐서 객체 만들기
        trg_len = trg.shape[0] # 단어 개수
        batch_size = trg.shape[1] # 배치 크기
        trg_vocab_size = self.decoder.output_dim # 출력 차원
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # 첫 번째 입력은 항상 <sos> 토큰
        input = trg[0, :]

        # 타겟 단어의 개수만큼 반복하여 디코더에 포워딩(forwarding)
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)

            outputs[t] = output # FC를 거쳐서 나온 현재의 출력 단어 정보
            top1 = output.argmax(1) # 가장 확률이 높은 단어의 인덱스 추출

            # teacher_forcing_ratio : 학습할 때 실제 목표 출력(ground-truth)을 사용하는 비율
            teacher_force = random.random() < teacher_forcing_ratio
            input = trg[t] if teacher_force else top1 # 현재의 출력 결과를 다음 입력에서 넣기

        return outputs

Train 학습

input_dim = len(SRC.vocab)
output_dim = len(TRG.vocab)
encoder_embed_dim = 256
decoder_embed_dim = 256
hidden_dim = 512
n_layers = 2
enc_dropout_ratio = 0.5
dec_dropout_ratio = 0.5

# 인코더와 디코더 객체 선언
enc = Encoder(input_dim, encoder_embed_dim, hidden_dim, n_layers, enc_dropout_ratio)
dec = Decoder(output_dim, decoder_embed_dim, hidden_dim, n_layers, dec_dropout_ratio)

# Seq2Seq 객체 선언
model = Seq2Seq(enc, dec, device).to(device)

 

# 모델 학습 함수
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    # 전체 학습 데이터를 확인
    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg

        optimizer.zero_grad()

        output = model(src, trg)
        # output : [출력 단어 개수, 배치 크기, 출력 차원]
        ouput_dim = output.shape[-1]

        # 출력 단어의 인덱스 0은 사용하지 않음
        output = output[1:].view(-1, output_dim)
        # output = [(출력 단어의 개수 - 1) * batch size, output dim]
        trg = trg[1:].view(-1)
        # trg = [(타겟 단어의 개수 - 1) * batch size]

        # 모델의 출력 결과와 타겟 문장을 비교하여 손실 함수 계산
        loss = criterion(output, trg)
        loss.backward() # 기울기(gradient) 계산

        # 기울기 clipping 진행
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        # 파라미터 업데이트
        optimizer.step()

        # 전체 손실 값 계산
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)
# 모델 평가 함수
def evaluate(model, iterator, criterion):
    model.eval() # 평가 모드
    epoch_loss = 0

    with torch.no_grad():
        # 전체 평가 데이터를 확인하며
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg

            # 평가할 때 teacher forcing는 사용하지 않음
            output = model(src, trg, 0)
            # output: [출력 단어 개수, 배치 크기, 출력 차원]
            output_dim = output.shape[-1]

            # 출력 단어의 인덱스 0은 사용하지 않음
            output = output[1:].view(-1, output_dim)
            # output = [(출력 단어의 개수 - 1) * batch size, output dim]
            trg = trg[1:].view(-1)
            # trg = [(타겟 단어의 개수 - 1) * batch size]

            # 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
            loss = criterion(output, trg)

            # 전체 손실 값 계산
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)
import time
import math
import random

N_EPOCHS = 20
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() # 시작 시간 기록

    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() # 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'seq2seq.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')