이제부터 S2S 모델의 어텐션 메커니즘을 간소화하여 구현해보겠습니다. 데이터셋은 쌍으로 이루어진 말뭉치로 영어 문장과 이에 상응하는 프랑스어 번역으로 구성되어 있습니다. 모델의 인코더는 양방향 GRU유닛을 사용하고 시퀀스에 있는 모든 부분의 정보를 기반으로 입력 시퀀스의 각 위치에 대한 벡터를 계산합니다. 파이토치의 PackedSequence 데이터 구조를 사용하며, 이후에 나올 ‘NMT 모델의 인코딩과 디코딩’에서 자세히 다뤄보겠습니다.
기계 번역 데이터셋
이번 예제에서는 데이터셋으로 타토에바 프로젝트(Tatoeba Project)의 영어-프랑스어 문장 쌍으로 구성된 데이터를 사용합니다. 우선 모든 문자를 소문자로 변환하고, NLTK를 이용하여 영어, 프랑스어 토큰화를 각 문장 쌍에 적용합니다. 이후 NLTK의 언어에 특화된 단어 토큰화를 적용해 토큰리스트를 만듭니다. 방금까지의 기본 전처리에 특정 문장 패턴을 지정하여 데이터의 일부분만 선택해 학습 문제를 단순하게 만들 수 있습니다. 제한된 문장 패턴으로 데이터 범위를 좁히는 것입니다. 이렇게 하면 모델의 분산을 낮추고 짧은 시간 내에 높은 성능 달성이 가능합니다.
NMT를 위한 벡터 파이프라인
소스 영어와 타깃 프랑스어를 벡터로 변환하기 위해서는 복잡한 파이프라인이 필요합니다. 복잡도가 증가하는 요인에는 크게 두 가지가 있습니다. 첫 번째로 소스와 타깃 시퀀스는 모델에서 다른 역할을 하고, 언어도 다르며, 벡터화되는 방식도 다르기 때문입니다. 둘 째로 파이토치의 PackedSequence를 사용할 때에는 소스 시퀀스의 길이에 따라 각 미니배치를 소팅해야하기 때문입니다. 위 두 가지 문제 때문에 NMTVectorizer는 별도의 SequenceVocabulary 객체 두 개를 만들고, 최대 시퀀스 길이를 따로 측정합니다.
class NMTVectorizer(object):
""" 어휘 사전을 생성하고 관리합니다 """
def __init__(self, source_vocab, target_vocab, max_source_length, max_target_length):
"""
매개변수:
source_vocab (SequenceVocabulary): 소스 단어를 정수에 매핑합니다
target_vocab (SequenceVocabulary): 타깃 단어를 정수에 매핑합니다
max_source_length (int): 소스 데이터셋에서 가장 긴 시퀀스 길이
max_target_length (int): 타깃 데이터셋에서 가장 긴 시퀀스 길이
"""
self.source_vocab = source_vocab
self.target_vocab = target_vocab
self.max_source_length = max_source_length
self.max_target_length = max_target_length
@classmethod
def from_dataframe(cls, bitext_df):
""" 데이터셋 데이터프레임으로 NMTVectorizer를 초기화합니다
매개변수:
bitext_df (pandas.DataFrame): 텍스트 데이터셋
반환값
:
NMTVectorizer 객체
"""
source_vocab = SequenceVocabulary()
target_vocab = SequenceVocabulary()
max_source_length = 0
max_target_length = 0
for _, row in bitext_df.iterrows():
source_tokens = row["source_language"].split(" ")
if len(source_tokens) > max_source_length:
max_source_length = len(source_tokens)
for token in source_tokens:
source_vocab.add_token(token)
target_tokens = row["target_language"].split(" ")
if len(target_tokens) > max_target_length:
max_target_length = len(target_tokens)
for token in target_tokens:
target_vocab.add_token(token)
return cls(source_vocab, target_vocab, max_source_length, max_target_length)
Python
복사
복잡도가 증가하는 첫 번째 요인으로 소스와 타깃 시퀀스를 다루는 방법이 다르다고 설명했습니다. 소스 시퀀스는 시작 부분에 BEGIN-OF-SEQUENCE 토큰을, 마지막에 END-OF-SEQUENCE 토큰을 추가하며 벡터화합니다. 이 모델은 양방향 GRU를 사용하여 소스 시퀀스에 있는 토큰을 위한 요약 벡터를 만듭니다. 반면에 타깃 시퀀스는 토큰 하나가 밀린 복사본 두 개로 벡터화됩니다. 시퀀스 예측 작업에는 타임 스텝마다 입력 토큰과 출력 토큰이 필요한데, S2S의 모델 디코더가 이 작업을 수행하면서도 인코더 문맥이 추가됩니다. 이 작업을 단순화하기 위해 소스와 타깃 인덱스에 상관없이 벡터화를 수행하는 _vectorize() 메서드를 만들었습니다. 다음으로 인덱스를 각기 처리하는 두 메소드를 만듭니다.
def _vectorize(self, indices, vector_length=-1, mask_index=0):
"""인덱스를 벡터로 변환합니다
매개변수:
indices (list): 시퀀스를 나타내는 정수 리스트
vector_length (int): 인덱스 벡터의 길이
mask_index (int): 사용할 마스크 인덱스; 거의 항상 0
"""
if vector_length < 0:
vector_length = len(indices)
vector = np.zeros(vector_length, dtype=np.int64)
vector[:len(indices)] = indices
vector[len(indices):] = mask_index
return vector
def _get_source_indices(self, text):
""" 벡터로 변환된 소스 텍스트를 반환합니다
매개변수:
text (str): 소스 텍스트; 토큰은 공백으로 구분되어야 합니다
반환값:
indices (list): 텍스트를 표현하는 정수 리스트
"""
indices = [self.source_vocab.begin_seq_index]
indices.extend(self.source_vocab.lookup_token(token) for token in text.split(" "))
indices.append(self.source_vocab.end_seq_index)
return indices
def _get_target_indices(self, text):
""" 벡터로 변환된 타깃 텍스트를 반환합니다
매개변수:
text (str): 타깃 텍스트; 토큰은 공백으로 구분되어야 합니다
반환값:
튜플: (x_indices, y_indices)
x_indices (list): 디코더에서 샘플을 나타내는 정수 리스트
y_indices (list): 디코더에서 예측을 나타내는 정수 리스트
"""
indices = [self.target_vocab.lookup_token(token) for token in text.split(" ")]
x_indices = [self.target_vocab.begin_seq_index] + indices
y_indices = indices + [self.target_vocab.end_seq_index]
return x_indices, y_indices
def vectorize(self, source_text, target_text, use_dataset_max_lengths=True):
""" 벡터화된 소스 텍스트와 타깃 텍스트를 반환합니다
벡터화된 소스 텍슽트는 하나의 벡터입니다.
벡터화된 타깃 텍스트는 7장의 성씨 모델링과 비슷한 스타일로 두 개의 벡터로 나뉩니다.
각 타임 스텝에서 첫 번째 벡터가 샘플이고 두 번째 벡터가 타깃이 됩니다.
매개변수:
source_text (str): 소스 언어의 텍스트
target_text (str): 타깃 언어의 텍스트
use_dataset_max_lengths (bool): 최대 벡터 길이를 사용할지 여부
반환값:
다음과 같은 키에 벡터화된 데이터를 담은 딕셔너리:
source_vector, target_x_vector, target_y_vector, source_length
"""
source_vector_length = -1
target_vector_length = -1
if use_dataset_max_lengths:
source_vector_length = self.max_source_length + 2
target_vector_length = self.max_target_length + 1
source_indices = self._get_source_indices(source_text)
source_vector = self._vectorize(source_indices,
vector_length=source_vector_length,
mask_index=self.source_vocab.mask_index)
target_x_indices, target_y_indices = self._get_target_indices(target_text)
target_x_vector = self._vectorize(target_x_indices,
vector_length=target_vector_length,
mask_index=self.target_vocab.mask_index)
target_y_vector = self._vectorize(target_y_indices,
vector_length=target_vector_length,
mask_index=self.target_vocab.mask_index)
return {"source_vector": source_vector,
"target_x_vector": target_x_vector,
"target_y_vector": target_y_vector,
"source_length": len(source_indices)}
Python
복사
복잡도의 다음 요인은 소스 시퀀스입니다. 양방향 GRU로 소스 시퀀스를 인코딩할 때 파이토치의 PackedSequence 데이터 구조를 사용하게 됩니다. 가변 길이 시퀀스의 미니배치는 각 시퀀스를 행으로 쌓은 정수 행렬로 표현되며, 시퀀스는 왼쪽 정렬되고 제로 패딩되어 가변 길이를 허용하게 됩니다. PackedSequence 데이터 구조는 아래 그림과 같이 가변 길이 시퀀스 미니배치를 배열 하나로 표현합니다. 시퀀스의 타임 스텝 데이터를 차례대로 연결하고 타임 스텝마다 시퀀스 길이를 기록하게 됩니다.
PackedSequence를 만들려면 각 시퀀스의 길이를 알아야 하며, 시퀀스의 길이 순서대로 내림차순 정렬을 해야 합니다. 정렬된 행렬을 만들기 위해서 미니배치에 있는 텐서를 시퀀스 길이 순서대로 정렬합니다. 아래 코드는 generate_batches()를 수정한 generate_nmt_batches() 함수입니다.
def generate_nmt_batches(dataset, batch_size, shuffle=True,
drop_last=True, device="cpu"):
""" 파이토치 DataLoader를 감싸고 있는 제너레이터 함수; NMT 버전 """
dataloader = DataLoader(dataset=dataset, batch_size=batch_size,
shuffle=shuffle, drop_last=drop_last)
for data_dict in dataloader:
lengths = data_dict['x_source_length'].numpy()
sorted_length_indices = lengths.argsort()[::-1].tolist()
out_data_dict = {}
for name, tensor in data_dict.items():
out_data_dict[name] = data_dict[name][sorted_length_indices].to(device)
yield out_data_dict
Python
복사
인코딩과 디코딩
영어를 프랑스어로 번역하는 기계 변역을 실현하기 위해서는 인코더-디코더 모델을 사용합니다. 인코더가 양방향 GRU를 사용하여 소스 시퀀스(영어 문장)을 벡터 상태의 시퀀스로 매핑하면 디코더가 인코더에서 출력된 은닉 상태를 초기 은닉 상태로 받아와 어텐션 메커니즘으로 소스 시퀀스를 토대로 출력 시퀀스(프랑스어 번역문)를 생성하는 과정을 거칩니다.
다음과 같이 신경망 기계 번역 모델 NMTModel을 코딩해봅니다. NMTModel은 하나의 forward() 메서드(정방향 계산하는 메서드)에 인코더와 디코더를 캡슐화하여 관리합니다.
class NMTModel(nn.Module):
"""
신경망 기계 번역 모델
"""
def __init__(self, source_vocab_size, source_embedding_size,
target_vocab_size, target_embedding_size, encoding_size,
target_bos_index):
"""
매개변수:
source_vocab_size (int): 소스 언어의 고유한 단어 개수
source_embedding_size (int): 소스 임베딩 벡터의 크기
target_vocab_size (int): 타깃 언어의 고유한 단어 개수
target_embedding_size (int): 타깃 임베딩 벡터의 크기
encoding_size (int): 인코더 RNN의 크기
target_bos_index (int): BEGIN-OF-SEQUENCE 토큰 인덱스
"""
super(NMTModel, self).__init__()
self.encoder = NMTEncoder(num_embeddings = source_vocab_size,
embedding_size = source_embedding_size,
rnn_hidden_size = encoding_size)
decoding_size = encoding_size = 2
self.decoder = NMTDecoder(num_embeddings = target_vocab_size,
embedding_size = tarrget_embedding_size,
rnn_hidden_size = decoding_size,
bos_index = target_bos_index)
def forward(self, x_source, x_source_lengths, target_sequence):
"""
모델의 정방향 계산
매개변수:
x_source (torch.Tensor): 소스 텍스트 데이터 센서
x_source.shape는 (batch, vectorizer.max_source_length)입니다.
x_source_lengths torch.Tensor): x_source의 시퀀스 길이
target_sequence (torch.Tensor): 타깃 텍스트 데이터 텐서
반환값:
decoded_states (torch.Tensor): 각 출력 타임 스텝의 예측 벡터
"""
encoder_state, final_hidden_states = self.encoder(x_source,
x_source_lengths)
decoded_states = self.decoder(encoder_state = encoder_state,
initial_hidden_state = final_hidden_states,
target_sequence = target_sequence)
return decoded_states
Python
복사
다음으로 양방향 GRU를 사용하여 단어를 임베딩하고 특성을 추출하는, 즉, 소스 시퀀스를 벡터 상태로 매핑하는 인코더 NMTEncoder를 코딩해봅니다. 여기서 인코더의 출력은 양방향 GRU의 최종 은닉 상태가 되고 이를 이후 디코더가 받게 됩니다.
코드를 자세히 살펴보면 우선 임베딩 층을 사용해 입력 시퀀스를 임베딩합니다. 이때, 가변 길이 시퀀스는 padding_idx라는 매개변수를 통해 처리합니다. padding_idx와 동일한 모든 위치는 0벡터가 되며 최적화 과정을 거칠 때 업데이트되지 않는 마스킹(masking)이 되기 때문에 가변 길이 시퀀스 처리가 가능합니다.
다만 양방향 GRU는 특별히 순방향일 때와 역방향일 때의 마스킹된 위치가 달라질 수 있기 때문에 인코더-디코더 모델에서는 마스킹 위치를 다른 방식으로, 파이토치의 PackedSequence 데이터 구조를 사용하여 처리한다고 합니다.
class NMTEncoder(nn.Module):
def __init__(self, num_embeddings, embedding_size, rnn_hidden_size):
"""
매개변수:
num_embeddings (int): 임베딩 개수는 소스 어휘 사전의 크기입니다
embedding_size (int): 임베딩 벡터의 크기
rnn_hidden_size (int): RNN 은닉 상태 벡터의 크기
"""
super(NMTEncoder, self).__init__()
self.source_embedding = nn.Embedding(num_embeddings, embedding_size,
padding_idx = 0)
self.birnn = nn.GRU(embedding_size, rnn_hidden_size, bidirectional = True,
batch_first = True)
def forward(self, x_source, x_lengths):
x_embedded = self.source_embedding(x_source)
# create PackedSequence 생성 x_packed.data.shape = (number_items,
# embedding_size)
x_lengths = x_lengths.detatch().cpu().numpy()
x_packed = pack_padded_sequence(x_embedded, x_lengths, batch_first = True)
# x_birnn_h.shape = (num_rnn, batch_size, feature_size)
x_birnn_out, x_birnn_h = self.birnn(x_packed)
# (batch_size, num_rnn. feature_size)로 변환
x_birnn_h = x_brnn_h.permute(1, 0, 2)
# 특성 펼침. (batch_size, num_rnn * feature_size)로 바꾸기
# (참고: -1은 남은 차원에 해당하며,
# 두 깨의 RNN 은닉 벡터를 1로 펼칩니다)
x_birnn_h = x_birnn_h.contiguous().view(x_birnn_h.size(0), -1)
x_unpacked, _ = pad_packed_sequence(x_birnn_out, batch_first = True)
return x_unpacked, x_birnn_h
Python
복사
이제 인코더의 출력인 최종 은닉 상태를 디코더 NMTDecoder이 받아 타임 스텝을 순회하면서 출력 시퀀스를 생성합니다.
이 예제는 타깃 시퀀스가 타임 스텝마다 샘플로 제공된다는 특이점이 있습니다. GRUCell을 사용해 은닉 상태를 계산하면 인코더의 최종 은닉 상태에 Linear층을 적용하여 초기 은닉 상태를 계산하는데, 이때 디코더 GRU는 임베딩된 입력 토큰과 마지막 타임 스텝의 문맥 벡터를 연결한 벡터를 입력 받는다고 합니다. 그리고 쿼리 벡터를 사용하여 그 새로운 입력 벡터를 현재 타임 스텝에서 어텐션 메커니즘으로 새로운 문맥 벡터를 만든 후 은닉 상태와 연결하여 디코딩 정보를 표현하는 벡터를 만듭니다. 이 벡터를 이용하여 분류기(간단한 Linear층)가 예측 벡터 score_for_y_t_index를 생성하고 소프트맥스 함수를 이용하여 예측 벡터를 생성합니다.
class NMTDecoder(nn.Module):
def __init__(self, num_embeddings, embedding_size, rnn_hidden_size, bos_index):
"""
매개변수:
num_embeddings (int): 임베딩 개수는 타깃 어휘 사전의 고유한 단어의 개수입니다
embeddin_size (int): 임베딩 벡터 크기
rnn_hidden_size (int): RNN 은닉 상태 크기
bos_index (int): begin-of-sequence 인덱스
"""
super(NMTDecoder, self).__init__()
self._rnn_hidden_size = rnn_hidden_size
self.target_embedding = nn.Embedding(num_embeddings = num_embeddings,
embedding_dim = embedding_size,
padding_ide = 0)
self.gru_cell = nn.GRUCell(embedding_size + rnn_hidden_size,
rnn_hidden_size)
self.hidden_map = nn.Linear(rnn_hidden_size, rnn_hidden_size)
self.classifie = nn.Linear(rnn_hidden_size * 2, num_embeddings)
self.bos_index = bos_index
def _init_indices(self, batch_size):
"""
BEGIN-OF-SEQUENCE 인덱스 벡터를 반환합니다
"""
return torch.ones(batch_size, dtype = torch.int64) * self.bos_index
def _init_context_vectors(self, batch_size):
"""
문맥 벡터를 초기화하기 위한 0 벡터를 반환합니다
"""
return torch.zeros(batch_size, self._rnn_hidden_size)
def forward(self, encoder_state, initial_hidden_state, target_sequence):
"""
"""
# 가정: 첫 번째 차원은 배치 차원입니다
# 즉 입력은 (Batch, Seq)
# 시퀀스에 대해 반복해야 하므로 (Seq, Batch)로 차원을 바꿉니다
target_sequence = target_sequence.permute(1,0)
# 주어진 인코더의 은닉 상태를 초기 은닉 상태로 사용합니다
h_t = self.hidden_map(initial_hidden_state)
batch_size = encoder_state_size(0)
# 문맥 벡터를 0으로 초기화합니다
context_vectors - self._init_context_vectors(batch_size)
# 처 단어 y_t를 BOS로 초기화합니다
y_t_index = self._init_indeices(batch_size)
h_t = h_t.to(encoder_state.device)
y_index = y_t_index.to(encoder_state.device)
context_vectors = context_vectors.to(encoder_state.device)
output_vectors = []
#분석을 위해 GPU에서 캐싱된 모든 텐서를 가져와 저장합니다
self._cached_p_attn = []
self._cached_ht = []
self._cached_decoder_state = encoder_state.cpu().detatch().numpy()
output_sequence_size = target_sequence_size(0)
for i in range(output_sequence_size):
# 1단계: 단어를 임베딩하고 이전 문맥과 연결합니다
y_input_vector = self.target_embedding(target_sequence[i])
rnn_input = torch.cat([y_input_vector, context_vectors], dim = 1)
# 2단계: GRU를 적용하고 새로운 은닉 벡터를 얻습니다
h_t = self.gru_cell(rnn_input, h_t)
self._cached_ht.append(h_t.cpu().data.numpy())
# 3단계: 현재 은닉 상태를 사용해 인코더의 상태를 주목합니다
context_vectors, p_attn, _ = \
verbose_attention(encoder_state_vectors = encoder_state,
query_vector = h_t)
# 부가 작업: 시각화를 위해 어텐션 확률을 저장합니다
self._cached_p_attn.append(p_attn.cpu().detatch().numpy())
# 4단계: 현재 은닉 상태와 문맥 벡터를 사용해 다음 단어를 예측합니다
prediction_vector = torch.cat((context_vectors, h_t), dim = 1)
score_for_y_t_index = self.classifier(prediction_vector)
# 부가 작업: 예측 성능 점수를 기록합니다
output_vectors.append(score_for_y_t_index)
Python
복사
어텐션 메커니즘 자세히 알아보기
이 예제에서는 어텐션 메커니즘의 동작을 이해하는 것을 목표로 합니다. 이전 글 에서 설명했던 어텐션 메커니즘의 모델을 다시 한 번 가져와 수식적으로 살펴보겠습니다.
ate를 계산하기 위한 디코더 RNN에 입력으로 들어오는 디코더 은닉 상태 값을 쿼리(Query)라고 부르고, 인코더 RNN의 각 출력값들을 키(Key), 값(Value)라고 부릅니다. 어떤 단어들에 집중할 지를 나타내는 값인 어텐션 값(Attention Score)를 구하기 위해, 8-3에서 설명했던 에너지 값(Energy - 단어끼리 얼마나 연관성이 있는가)를 구해야 합니다. 이를 위해, 디코더의 은닉 값인 쿼리와 인코더의 각 은닉 값인 키를 내적해줍니다. 여기에서 연산 결과로 스칼라 값을 얻기 위해, 디코더 값은 전치하여 내적을 수행합니다.
구한 일련의 스칼라값들을 0과 1사이의 값이면서 총 합이 1인 확률 분포로 변환하기 위해 소프트맥스 함수를 적용해줍니다. 소프트맥스 함수는 주어진 값들의 비율은 유지하면서 총 합이 1이 되도록 만들어주는, 확률 분포로 변환하기 위해 사용하는 함수입니다. 이 값이 위 이미지에서는 가중치로 표현되어있습니다.
뒤에서 다룰 표이지만, 설명에 도움을 주기 위해 앞으로 가져왔습니다. 디코더의 어텐션 확률 분포를 아래 표에서 살펴볼 수 있습니다. 이는, 소스 문장과 번역된 문장의 각 단어간에 높은 관계를 갖고있는 단어끼리 높은 확률 분포를 보여줍니다. 확률 분포가 높은 단어의 값에 더 많은 가중치를 주고, 단어 번역할 때 관련성이 높은 단어에 초점을 맞춘다고 생각할 수 있습니다.
각 인코더의 은닉 값과 앞서 구한 가중치를 곱한 뒤, 곱한 값들의 합인 가중 합(Weighted Sum)을 구합니다. 이를 통해 어텐션 값 (Attention Score, Attention Value,) 또는 문맥을 담고있는 벡터인 문맥 벡터(Context Vector)라고 부릅니다. 이 어텐션 값을 통해, 모델은 각 단어를 처리하기 위해 어떤 단어에 집중해야 더 좋은 성능을 얻을 수 있는지를 학습할 수 있습니다.
아래는 어텐션 메커니즘을 코드로 구현한 것입니다. 첫 번째 어텐션 함수인 verbose_attention 은 위에서 설명한 어텐션 메커니즘을 한 줄에 하나씩 자세하게 설명해둔 것이고, 두 번째 terse_attention 은 matmul을 사용해 조금 더 효율적으로 연산하는 과정을 구현한 함수입니다.
def verbose_attention(encoder_state_vectors, query_vector):
""" 원소별 연산을 사용하는 어텐션 메커니즘 버전
매개변수:
encoder_state_vectors (torch.Tensor): 인코더의 양방향 GRU에서 출력된 3차원 텐서
query_vector (torch.Tensor): 디코더 GRU의 은닉 상태
"""
batch_size, num_vectors, vector_size = encoder_state_vectors.size()
vector_scores = torch.sum(encoder_state_vectors * query_vector.view(batch_size, 1, vector_size), dim=2)
# 쿼리 값과 각 인코더 RNN의 은닉 벡터들의 곱
vector_probabilities = F.softmax(vector_scores, dim=1)
# 값들을 0~1의 값으로 확률 분포화로 가중치 구하기
weighted_vectors = encoder_state_vectors * vector_probabilities.view(batch_size, num_vectors, 1)
# 은닉 벡터값과 가중치의 곱 구하기
context_vectors = torch.sum(weighted_vectors, dim=1)
# 이 값들의 합으로 컨텍스트 벡터 또는 어텐션 값 구하기
return context_vectors, vector_probabilities, vector_scores
def terse_attention(encoder_state_vectors, query_vector):
""" 점곱을 사용하는 어텐션 메커니즘 버전
매개변수:
encoder_state_vectors (torch.Tensor): 인코더의 양방향 GRU에서 출력된 3차원 텐서
query_vector (torch.Tensor): 디코더 GRU의 은닉 상태
"""
vector_scores = torch.matmul(encoder_state_vectors, query_vector.unsqueeze(dim=2)).squeeze()
# matmul 함수를 이용해 쿼리*키 값 구하기
vector_probabilities = F.softmax(vector_scores, dim=-1)
# 가중치 구하기
context_vectors = torch.matmul(encoder_state_vectors.transpose(-2, -1), vector_probabilities.unsqueeze(dim=2)).squeeze()
# 컨텍스트 벡터 값 구하기
return context_vectors, vector_probabilities
Python
복사
스케줄링된 샘플링 Scheduled Sampling
학습 과정에서 주어지는 데이터에서는 타겟 시퀀스가 제공되고, 이를 이용해 각 타임스텝마다 연산과 학습을 진행하지만, 실제 데이터 혹은 테스트 데이터에서는 모델이 만드는 시퀀스가 어떤지 알 수 없기 때문에, 이러한 과정이 작동하지 않을 수 있습니다. 즉, 학습 시에는 타겟 시퀀스가 있지만, 테스트에서는 타겟 시퀀스가 없어 가중치가 크게 벗어나는 문제가 발생할 수 있습니다.
이를 해결하기 위해, 샘플링 기법을 통해 학습 과정에서도 일부 타겟 시퀀스를 모델에게 맡기는 방법을 활용합니다. 간단하게 말하면, 샘플링 기법은 데이터 중 일부만 뽑아서 활용한다는 의미입니다. 학습 과정에서 주어진 타겟 시퀀스와 모델이 자체 생성한 시퀀스를 무작위로 사용하며, 모델이 결정하는 시퀀스와 확률 분포가 개선되도록 모델을 학습시킵니다.
이를 위해 샘플링 확률을 먼저 설정해둡니다. 초기 인덱스를 시작 토큰인 BEGIN 인덱스로 먼저 지정하고 시작합니다. 그 다음, 시퀀스 생성문을 반복할 때 랜덤한 값(난수)를 발생시키고, 확률보다 작으면 모델의 예측 시퀀스를 사용하고, 확률보다 크면 주어진 타겟 시퀀스를 사용하면서 예측을 수행하고 학습을 진행합니다.
아래는 스케줄링된 샘플링을 앞서 정의했던 NMTDecoder에 forward 함수를 수정해 작성한 내용입니다. sample_probability 값을 지정해줄 때, 0에 가까울수록 예측 시퀀스를 사용하고, 1에 가까울수록 타겟 시퀀스를 사용하게 됩니다. 또, 반복문마다 use_sample = np.random.random() < sample_probability 라는 코드를 사용하여, 난수 값을 기준으로 확률에 의거해 모델 예측 시퀀스를 사용할 지, 아니면 타겟 시퀀스를 사용할 지 결정하게 됩니다.
class NMTDecoder(nn.Module):
def __init__(self, num_embeddings, embedding_size, rnn_hidden_size, bos_index):
super(NMTDecoder, self).__init__()
# 앞서 작성한 코드와 동일한 정의
# 생략
def forward(self, encoder_state, initial_hidden_state, target_sequence, sample_probability=0.0):
""" 모델의 정방향 계산
매개변수:
encoder_state (torch.Tensor): NMTEncoder의 출력
initial_hidden_state (torch.Tensor): NMTEncoder의 마지막 은닉 상태
target_sequence (torch.Tensor): 타깃 텍스트 데이터 텐서
sample_probability (float): 스케줄링된 샘플링 파라미터
디코더 타임 스텝마다 모델 예측에 사용할 확률
반환값:
output_vectors (torch.Tensor): 각 타임 스텝의 예측 벡터
"""
# 샘플 확률을 지정해줍니다
# 0과 1 사이의 적당한 값을 지정해줍니다
# 0 : 예측 시퀀스만을 사용
# 1 : 타겟 시퀀스만을 사용
if target_sequence is None:
sample_probability = 0.5
else:
# 가정: 첫 번째 차원은 배치 차원입니다
# 즉 입력은 (Batch, Seq)
# 시퀀스에 대해 반복해야 하므로 (Seq, Batch)로 차원을 바꿉니다
target_sequence = target_sequence.permute(1, 0)
output_sequence_size = target_sequence.size(0)
# 주어진 인코더의 은닉 상태를 초기 은닉 상태로 사용합니다
h_t = self.hidden_map(initial_hidden_state)
batch_size = encoder_state.size(0)
# 문맥 벡터를 0으로 초기화합니다
context_vectors = self._init_context_vectors(batch_size)
# 첫 단어 y_t를 BOS로 초기화합니다
y_t_index = self._init_indices(batch_size)
h_t = h_t.to(encoder_state.device)
y_t_index = y_t_index.to(encoder_state.device)
context_vectors = context_vectors.to(encoder_state.device)
output_vectors = []
self._cached_p_attn = []
self._cached_ht = []
self._cached_decoder_state = encoder_state.cpu().detach().numpy()
# 반복문 이전까지는 기존의 Decoder 코드와 동일합니다
for i in range(output_sequence_size):
# 스케줄링된 샘플링 사용 여부
# 생성한 난수와 확률 값을 비교해 샘플링 사용 유무를 선택합니다
use_sample = np.random.random() < sample_probability
if not use_sample:
y_t_index = target_sequence[i]
# 단계 1: 단어를 임베딩하고 이전 문맥과 연결합니다
y_input_vector = self.target_embedding(y_t_index)
rnn_input = torch.cat([y_input_vector, context_vectors], dim=1)
# 단계 2: GRU를 적용하고 새로운 은닉 벡터를 얻습니다
h_t = self.gru_cell(rnn_input, h_t)
self._cached_ht.append(h_t.cpu().detach().numpy())
# 단계 3: 현재 은닉 상태를 사용해 인코더의 상태를 주목합니다
context_vectors, p_attn, _ = verbose_attention(encoder_state_vectors=encoder_state, query_vector=h_t)
# 부가 작업: 시각화를 위해 어텐션 확률을 저장합니다
self._cached_p_attn.append(p_attn.cpu().detach().numpy())
# 단게 4: 현재 은닉 상태와 문맥 벡터를 사용해 다음 단어를 예측합니다
prediction_vector = torch.cat((context_vectors, h_t), dim=1)
score_for_y_t_index = self.classifier(F.dropout(prediction_vector, 0.3))
if use_sample:
p_y_t_index = F.softmax(score_for_y_t_index * self._sampling_temperature, dim=1)
# _, y_t_index = torch.max(p_y_t_index, 1)
y_t_index = torch.multinomial(p_y_t_index, 1).squeeze()
# 부가 작업: 예측 성능 점수를 기록합니다
output_vectors.append(score_for_y_t_index)
output_vectors = torch.stack(output_vectors).permute(1, 0, 2)
return output_vectors
Python
복사
이 과정을 통해, 모델은 시퀀스를 예측하는 방법을 학습하여, 모델이 예측한 시퀀스와 가중치가 정답에서 크게 벗어나는 경우를 줄여줍니다.
모델 훈련
이번 장에서 다룬 모델의 훈련 과정은 앞서 6장과 7장에서 다룬 모델의 훈련 과정과 비슷합니다.
1.
소스 시퀸스와 타겟 시퀀스를 입력받아, 타겟 시퀀스 예측 생성
2.
타겟 시퀀스 예측 레이블을 통해 크로스 엔트로피 손실 계산
크로스 엔트로피 손실은 확률 분포와 예측 분포 사이의 차이를 계산하여, 분류 모델이 예측을 잘 수행하는지를 평가하는 지표입니다.
3.
역전파를 통해 그래디언트를 계산
4.
옵티마이저를 통해 모델 파라미터를 업데이트
위 과정으로 훈련한 모델에 대해, 소스 문장과 모델에 의해 생성된 문장 쌍에 대해 BLEU 지표로 평가하여 모델이 얼마나 잘 작동하는지 확인합니다. 우리는 앞에서 2가지 모델을 살펴봤습니다. 1) 제공된 타겟 시퀀스를 바탕으로 디코더에 입력하는 모델. 2) 스케줄링된 샘플링을 통해 자체 예측을 만들어 디코더에 입력하는 모델. 이 중에서 2번째 모델, 자체 예측을 사용하는 모델은 시퀀스 예측에 대한 오류를 최적화하도록 하는 장점을 갖고있습니다. 두 모델의 BLEU 점수는 다음과 같습니다. 1번 모델보다 2번 모델에서 조금 더 높은 점수를 보이고 있음을 확인할 수 있습니다.
모델 | BLEU |
제공된 타겟 시퀀스를 입력으로 사용하는 모델 | 46.8 |
스케줄링된 샘플링을 입력으로 사용하는 모델 | 48.1 |
이전 글 읽기