본문 바로가기
Papers with Code/Natural Language Processing

[Paper Review Code] Attention Is All You Need (1)

by BangGeuk 2024. 3. 10.

본 내용은 https://arxiv.org/abs/1706.03762 논문을 기반으로 하고

https://github.com/ndb796/Deep-Learning-Paper-Review-and-Practice에서 Transformer 구현 코드를 참고하였습니다.

혹시 잘못된 부분이나 수정할 부분이 있다면 댓글로 알려주시면 감사하겠습니다.


Transformer는 크게 인코더-디코더 아키텍처를 가진다. 아래 그림을 보면 상단이 인코더, 하단이 디코더이다.

이미지 출처 : https://github.com/rickiepark/nlp-with-transformers/blob/main/03_transformer-anatomy.ipynb

 

인코더

인코더 층을 자세히 보면 아래 그림과 같다.

이미지 출처 : https://github.com/rickiepark/nlp-with-transformers/blob/main/03_transformer-anatomy.ipynb

 

토큰화

import spacy

spacy_en = spacy.load('en_core_web_sm') # 영어 토큰화
spacy_de = spacy.load('de_core_news_sm') # 독일어 토큰화

# 독일어 문장을 토큰화 하는 함수
def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)]

# 영어 문장을 토큰화 하는 함수
def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

 

 

데이터 전처리

from torchtext.data import Field, BucketIterator

SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True, batch_first=True)

from torchtext.datasets import Multi30k

train_data, valid_data, test_data = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG), root='.../data')

Field 라이브러리를 이용해 데이터셋에 대한 전처리 내용을 명시한다.

dataset은 Multi30k를 이용하며 본 코드는 직접 Multi30k 데이터셋을 다운받아 이용한 것이다.

 

vocab 생성

SRC.build_vocab(train_data, min_freq=2)
TRG.build_vocab(train_data, min_freq=2)

Field 객체의 build_vocab 메서드를 이용해 영어와 독일어 vocab 생성 (최소 2번 이상 등장한 단어만 선택)

 

입력 sequence 길이 맞추기

import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=batch_size,
    device=device)

하나의 배치에 포함된 문장들이 가지는 단어의 개수가 유사하게 만들면 좋음

BucketIterator는 이용해 sequence 길이에 따라 데이터를 정렬하고 비슷한 길이의 시퀀스들을 함께 묶어주는 역할

 

어텐션 점수 계산 함수

def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.bmm(weights, value)

softmax((query*key^T)/루트(차원))*value

 

MultiHeadAttention

import torch.nn as nn

class MultiHeadAttentionLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, dropout_ratio, device):
        super().__init__()

        assert hidden_dim % n_heads == 0

        self.hidden_dim = hidden_dim # 임베딩 차원
        self.n_heads = n_heads # 헤드의 개수 : 서로 다른 어텐션 컨셉의 수
        self.head_dim = hidden_dim // n_heads # 각 헤드에서의 임베딩 차원

        self.fc_q = nn.Linear(hidden_dim, hidden_dim) # Query 값에 적용될 FC 레이어
        self.fc_k = nn.Linear(hidden_dim, hidden_dim) # Key 값에 적용될 FC 레이어
        self.fc_v = nn.Linear(hidden_dim, hidden_dim) # Value 값에 적용될 FC 레이어

        self.fc_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, query, key, value, mask = None):

        batch_size = query.shape[0]

        # query : [batch_size, query_len, hidden_dim]
        # key : [batch_size, key_len, hidden_dim]
        # value : [batch_size, value_len, hidden_dim]

        Q = self.fc_q(query)
        K = self.fc_k(key)
        V = self.fc_v(value)

        # Q : [batch_size, query_len, hidden_dim]
        # K : [batch_size, key_len, hidden_dim]
        # V : [batch_size, value_len, hidden_dim]

        # hidden_dim -> n_heads X head_dim 형태로 변형
        # n_heads(h)개의 서로 다른 어텐션 컨셉을 학습하도록 유도
        Q = Q.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = K.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = V.view(batch_size, -1, self.n_heads, self.head_dim).permute(0, 2, 1, 3)

        # Attention Energy 계산
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale # matmul(): 행렬 곱

        # energy : [batch_size, n_heads, query_len, key_len]

        # 마스크를 사용하는 경우
        if mask is not None:
            energy = energy.masked_fill(mask==0, -1e10)

        # 어텐션 스코어 계산 : 각 단어에 대한 확률 값
        attention = torch.softmax(energy, dim=-1)

        # attention: [batch_size, n_heads, query_len, key_len]

        # 여기에서 Scaled Dot-Product Attention을 계산
        x = torch.matmul(self.dropout(attention), V)

        # x: [batch_size, n_heads, query_len, head_dim]

        x = x.permute(0, 2, 1, 3).contiguous() # contiguous(): 텐서를 연속적인 메모리에 저장

        # x: [batch_size, query_len, n_heads, head_dim]

        x = x.view(batch_size, -1, self.hidden_dim)

        # x: [batch_size, query_len, hidden_dim]

        x = self.fc_o(x)

        # x: [batch_size, query_len, hidden_dim]

        return x, attention

head_dim은 각 헤드에서의 임베딩 차원이다. (논문에서 head_dim=512)

√임베딩 차원으로 나누는 이유(scaling 해주는 이유)는 임베딩 차원이 큰 경우 내적값이 굉장히 커짐에 따라 특정 softmax 값이 1에 근사하게 되어 gradient가 소실되는 결과를 낳게 된다.

 

self.fc_q = nn.Linear(hidden_dim, hidden_dim) 부분을 보면 hidden_dim을 hidden_dim으로 변경해주는 것을 알 수 있다.

이러한 self.fc_q 함수는 forward 부분에서 self.fc_q(query)로 사용되는데 query에서의 hidden_dim을 hidden_dim으로 변경해주는 것을 알 수 있다. (query는 [batch_size, query_len, hidden_dim]으로 맨 마지막 차원을 변경해주는 것이다.)

 

Position-wise Feedforward

class PositionwiseFeedforwardLayer(nn.Module):
    def __init__(self, hidden_dim, pf_dim, dropout_ratio):
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, pf_dim)
        self.fc_2 = nn.Linear(pf_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x):

        # x : [batch_size, seq_len, hidden_dim]

        x = self.dropout(torch.relu(self.fc_1(x)))

        # x : [batch_size, seq_len, pf_dim]

        x = self.fc_2(x)

        # x : [batch_size, seq_len, hidden_dim]

        return x

 

Encoder Layer Architecture

class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, n_heads, pf_dim, dropout_ratio, device):
        super().__init__()

        self.self_attn_layer_norm = nn.LayerNorm(hidden_dim)
        self.ff_layer_norm = nn.LayerNorm(hidden_dim)
        self.self_attention = MultiHeadAttentionLayer(hidden_dim, n_heads, dropout_ratio, device)
        self.positionwise_feedforward = PositionwiseFeedforwardLayer(hidden_dim, pf_dim, dropout_ratio)
        self.dropout = nn.Dropout(dropout_ratio)

    # 하나의 임베딩이 복제되어 Query, Key, Value로 입력되는 방식
    def forward(self, src, src_mask):

        # src: [batch_size, src_len, hidden_dim]
        # src_mask: [batch_size, src_len]

        # self attention
        # 필요한 경우 마스크(mask) 행렬을 이용하여 어텐션(attention)할 단어를 조절 가능
        _src, _ = self.self_attention(src, src, src, src_mask)

        # dropout, residual connection and layer norm
        src = self.self_attn_layer_norm(src + self.dropout(_src))

        # src: [batch_size, src_len, hidden_dim]

        # position-wise feedforward
        _src = self.positionwise_feedforward(src)

        # dropout, residual and layer norm
        src = self.ff_layer_norm(src + self.dropout(_src))

        # src: [batch_size, src_len, hidden_dim]

        return src

 

최종 Transformer Encoder

class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, n_layers, n_heads, pf_dim, dropout_ratio, device, max_length=100):
        super().__init__()

        self.device = device

        self.tok_embedding = nn.Embedding(input_dim, hidden_dim)
        self.pos_embedding = nn.Embedding(max_length, hidden_dim)

        self.layers = nn.ModuleList([EncoderLayer(hidden_dim, n_heads, pf_dim, dropout_ratio, device) for _ in range(n_layers)])

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([hidden_dim])).to(device)

    def forward(self, src, src_mask):

        # src: [batch_size, src_len]
        # src_mask: [batch_size, src_len]

        batch_size = src.shape[0]
        src_len = src.shape[1]

        pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)

        # pos: [batch_size, src_len]

        # 소스 문장의 임베딩과 위치 임베딩을 더한 것을 사용
        src = self.dropout((self.tok_embedding(src) * self.scale) + self.pos_embedding(pos))

        # src: [batch_size, src_len, hidden_dim]

        # 모든 인코더 레이어를 차례대로 거치면서 순전파(forward) 수행
        for layer in self.layers:
            src = layer(src, src_mask)

        # src: [batch_size, src_len, hidden_dim]

        return src # 마지막 레이어의 출력을 반환

 

 

Encoder를 활용한 Classification model (분류 헤드 추가하기)

 

각 토큰에 대한 은닉 상태가 있어 토큰마다 예측을 할 수 있으나 필요한 예측은 단 하나이다. 일반적으로 첫 번째 토큰을 예측에 사용하고 드롭아웃 층과 선형 층을 추가해 분류 예측을 만든다.(다른 토큰보다 [CLS] 토큰이 전체적인 정보를 포함하기 때문)