본문 바로가기

밑바닥부터 시작하는 딥러닝

[밑바닥부터 시작하는 딥러닝2: Chapter 4] word2vec 속도 개선

728x90


본 포스팅은 밑바닥부터 시작하는 딥러닝2을 토대로 공부한 내용을 정리하기 위한 포스팅입니다.
해당 도서에 나오는 Source Code 및 자료는 GitHub를 참조하여 진행하였습니다.
https://github.com/WegraLee/deep-learning-from-scratch-2

4.0 학습 목표


이번 장에서는 word2vec의 고속화를 위해 앞 장의 CBOW 모델을 개선한다. 그 과정에서 Embedding 계층을 구현하고 Negative Sampling이라는 새로운 기법을 도입한다. 이번 장의 핵심은 ‘모두’ 대신 ‘일부’를 처리하는 것이다. Negative Sampling은 ‘모든’ 단어가 아닌 ‘일부’ 단어만을 대상으로 하는 것으로, 계산을 효율적으로 수행해준다.
word2vec으로 얻은 단어의 분산 표현은 다양한 자연어 처리 작업에 이용되고 있고, 다른 분야(음성, 이미지, 영상 등)에도 응용되고 있다. 이는 word2vec가 전이 학습 측면에서 중요하다는 것을 알 수 있다.

4.1 word2vec 개선 1

앞 장에서 배웠던 CBOW 모델을 어휘가 100만 개, 은닉층의 뉴런이 100개라고 가정해보면, word2vec은 [그림 4-2]처럼 된다.  

[그림 4-2]에서 보듯, 수많은 뉴런 때문에 중간 계산에 많은 시간이 소모된다. 정확히는 다음 두 계산이 병목된다.

  • 입력층의 one-hot 표현과 가중치 행렬 W_in의 곱 계산
  • 은닉층과 가중치 행렬 W_out의 곱 및 Softmax 계층의 계산


4.1.1 Embedding 계층

Embedding 계층은 단어를 one-hot 표현으로 바꾸며, 그것을 MatMul 계층에 입력한 후 가중치 행렬을 곱하는 과정을 생략하는 역할을 한다. [그림 4-3]을 보듯, 이 과정이 결과적으로 수행하는 일은 단지 가중치 행렬(W_in)의 특정 행을 추출하는 것뿐이다.  


따라서 one-hot 표현으로의 변환과 MatMul 계층의 행렬 곱 계산은 사실 필요가 없는 것이다.

class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros.like(W)]
        self.idx = None
    
    def forward(self, idx):
        W, = self.params
        self.idx = idx
        out = W[idx]
        return out

    def backward(self, dout):
        dW, = self.grads
        dW[...] = 0
        dW[self.idx] = dout
        return None


  Embedding 계층의 forward는 가중치 W의 특정행을 추출할 뿐이다. 단순히 가중치의 특정 행 뉴런만을 다음 층으로 흘려보내는 것이다. 따라서 역전파에서는 앞 층으로부터 전해진 기울기를 다음 층으로 그대로 흘려주기만 하면 된다. 다만, 앞 층으로부터 전해진 기울기(dh)를 가중치 기울기(dW)의 특정 행(idx번째 행)에 설정한다. 그림으로는 [그림 4-4]처럼 된다.


그런데 앞의 backward() 구현에는 idx의 원소가 중복될 때 문제가 발생하게 된다.


[그림 4-5]와 같이 dh의 각 행 값을 idx가 가리키는 장소에 할당해본다고 하자. 그러면 dW의 0번째 행에 2개의 값이 할당되고, 먼저 쓰여진 값을 덮어쓰게 된다. 이 중복 문제를 해결하려면 ‘할당’이 아닌 ’더하기‘를 해야 한다. 즉, dh의 각 행의 값을 dW의 해당 행에 더해준다. 따라서 올바른 역전파를 구현한 Embedding 계층은 아래와 같다.

class Embedding:
    def __init__(self, W):
        self.params = [W]
        self.grads = [np.zeros.like(W)]
        self.idx = None
    
    def forward(self, idx):
        W, = self.params
        self.idx = idx
        out = W[idx]
        return out

    def backward(self, dout):
        dW, = self.grads
        dW[...] = 0
        
        for i, word_id in enumerate(self.idx): # np.add.at(dW, self.idx, dout)
            dW[word_id] = dout[i]
        
        return None


4.2 word2vec 개선 2


남은 병목은 은닉층 이후의 처리(행렬 곱과 Softmax 계층의 계산)이다. 이 병목은 Negative Sampling이라는 기법을 사용해 해소한다. Softmax 대신 Negative Sampling을 이용하면 어휘가 아무리 많아져도 계산량을 낮은 수준에서 일정하게 억제할 수 있다.

4.2.2 다중 분류에서 이진 분류로


Negative Sampling의 핵심 아이디어는 ‘다중 분류(multi classification)를 이진 분류(binary classification)로 근사하는 것'이다. 지금까지는 맥락으로 "you"와 "goodbye"를 주면 정답인 “say"의 확률이 높아지도록 신경망을 학습시켰다. 즉, 이 신경망은 ”맥락이 ‘you'와 'goodbye'일 때, target 단어는 무엇입니까?“라는 다중 분류 질문에 올바른 답을 낸다.

이제부터는 ’다중 분류‘ 문제를 ’이진 분류‘ 방식으로 해결하는 것을 생각해보자. 예컨대 ”맥락이 'you'와 'goodbye'일 때, 타깃 단어는 ’say'입니까?“라는 질문에 "Yes/No"로 답하는 신경망을 생각해내야 한다. 이렇게 하면 출력층에는 뉴런을 하나만 준비하면 된다. 출력층의 이 뉴런이 "say"의 점수를 출력하는 것이다.


[그림 4-7]에서 보듯 출력층의 뉴런은 하나뿐이다. 따라서 은닉층과 출력 측의 가중치 행렬의 내적은 "say"에 해당하는 열(단어 벡터)만을 추출하고, 그 추출된 벡터와 은닉층 뉴런과의 내적을 계산하면 끝이다.

[그림 4-8]처럼 출력 측의 가중치 W_out에서는 각 단어 ID의 단어 벡터가 각각의 열로 저장되어 있다. 이 예에서는 "say"에 해당하는 단어 벡터를 추출한다. 그리고 그 벡터와 은닉층 뉴런과의 내적을 구한 값이 최종 점수가 된다.

4.2.3 시그모이드 함수와 교차 엔트로피 오차


이진 분류 문제를 신겸앙으로 풀려면 점수에 시그모이드 함수를 적용해 확률로 변환하고, 손실을 구할 때는 손실 함수로 ‘Cross Entropy Error'를 사용한다. 이 둘은 이진 분류 신경망에서 가장 흔하게 사용하는 조합이다.

[식 4.2] Sigmoid 함수

시그모이드 함수를 적용해 확률 y를 얻으면, 이 확률 y로부터 손실을 구한다. 시그모이드 함수에 사용되는 손실 함수는 다중 분류 때처럼 ‘Cross Entropy Error'이다. 이는 다음과 같이 쓸 수 있다.

[식 4.3] Cross Entropy Error(출력층에 뉴런을 2개만 사용하는 경우)

여기에서 y는 시그모이드 함수의 출력이고, t는 정답 레이블이다. 이 정답 레이블의 값은 0 혹은 1이므로, 따라서 t가 1이면 -log y가 출력되고, 반대로 t가 0이면 -log (1-y)가 출력된다.
Sigmoid 계층과 Cross Entropy Error 계층의 계산 그래프는 아래와 같다.

[그림 4-10]에서 주목할 점은 역전파의 y-t값이다. 이는 신경망이 출력한 확률에서 정답 레이블을 뺀 값이다. 예컨대 정답 레이블이 1이라면, y가 1에 가까워질수록 오차가 줄어든다는 뜻이다. 반대로 y가 1로부터 멀어지면 오차가 커지게 된다.

class SigmoidWithLoss:
    def __init__(self):
        self.params, self.grads = [], []
        self.loss = None
        self.y = None  # sigmoid의 출력
        self.t = None  # 정답 데이터

    def forward(self, x, t):
        self.t = t
        self.y = 1 / (1 + np.exp(-x))

        self.loss = cross_entropy_error(np.c_[1 - self.y, self.y], self.t)

        return self.loss

    def backward(self, dout=1):
        batch_size = self.t.shape[0]

        dx = (self.y - self.t) * dout / batch_size
        return dx


입력층을 Embedding 계층으로 구현하고 다중 분류를 이진 분류로 바꾼 word2vec의 전체 그림은 아래와 같다.

여기에서는 은닉층 뉴런 h와, 출력 측의 가중치 W_out에서 단어 "say"에 해당하는 단어 벡터와의 내적을 계산한다. 그리고 그 출력을 Sigmoid with Loss 계층에 입력해 최종 손실을 얻는다.

이 책에서는 신경망의 후반부를 더욱 간단하게 하기 위해 Embedding Dot 계층을 도입한다. 이 계층은 [그림 4-12]의 Embedding 계층과 'dot 연산(내적)‘의 처리를 합친 계층이다.


은닉층 뉴런 h는 Embedding Dot 계층을 거쳐 Sigmoid with Losss 계층을 통과한다. Embedding Dot 계층의 구현은 아래와 같다.

class EmbeddingDot:
    def __init__(self, W):
        self.embed = Embedding(W)
        self.params = self.embed.params
        self.grads = self.embed.grads
        self.cache = None
    
    def forward(self, h, idx):
        target_W = self.embed.forward(idx)
        out = np.sum(h*target_W, axis = 1)
        
        self.cache = (h, target_W)
        return out
    
    def backward(self, dout):
        h, target_W = self.cache
        dout = dout.reshape(dout.shape[0], 1)
        dtarget_W = dout * h
        self.embed.backward(dtarget_W)
        dh = dout * target_W
        return dh


embed는 Embedding 계층을, cache는 순전파 시의 계산 결과를 잠시 유지하기 위한 변수로 사용한다. 순전파를 담당하는 forward(h, idx) 메서드는 인수로 은닉층 뉴런(h)과 단어 ID의 넘파이 배열(idx)을 받는다. 여기에서 idx는 미니배치 처리를 가정한 단어 ID의 ‘배열’로 생각한다.

4.2.5 Negative Sampling


지금까지 배운 것으로 주어진 문제를 ‘다중 분류’에서 ‘이진 분류’로 변환할 수 있지만, 아직 긍정적인 예(정답)에 대해서만 학습을 했다. 다시 말해 부정적인 예(오답)를 입력하면 어떤 결과가 나올지도 생각해봐야 한다.

이진 분류의 목표는 긍정적인 예에 대해서는 Sigmoid 계층의 출력을 1에 가깝게 만들고, 부정적인 예에 대해서는 Sigmoid 계층의 출력을 0에 가깝게 만드는 것이다. 그림으로는 [그림 4-16]과 같다.


그럼 이제 오답일 때의 학습도 진행해야 한다는 것을 알았다. 그럼 어떻게 학습을 시켜야 효율적이게 할 수 있는지 생각해보자. 만약 모든 부정적인 예를 대상으로 이진 분류를 학습시키게 된다면 어휘 수 증가를 감당할 수 없게 될 것이다. 따라서 근사적인 해법으로 접근해, 부정적인 예를 몇 개 선택해서 학습하는 것이 좋다. 즉, 적은 수의 부정적인 예를 샘플링해 사용한다. 이것이 바로 ‘Negative Sampling' 기법이 의미하는 바이다.

정리하면,  Negative Sampling 기법은 긍정적 예를 target으로 한 경우의 손실을 구한다. 그와 동시에 부정적인 예를 몇 개 샘플링하여, 그 부정적 예에 대해서도 마찬가지로 손실을 구한다. 그리고 각각의 데이터(긍정적 예와 샘플링된 부정적 예)의 손실을 더한 값을 최종 손실로 한다.

Negative Sampling의 계산 그래프는 [그림 4-17]처럼 그릴 수 있다.

4.2.6 Negative Sampling의 샘플링 기법


앞에서 부정적인 예를 몇 개 선택한다고 말했는데, 이는 단순히 무작위로 샘플링하는 것이 아니라 확률 분포에 따라 샘플링하는 것을 의미한다. 구체적으로 말하면, 말뭉치에서 자주 등장하는 단어를 많이 추출하고 드물게 등장하는 단어를 적게 추출하는 것이다. 말뭉치에서의 단어 빈도를 기준으로 샘플링하려면, 먼저 말뭉치에서 각 단어의 출현 횟수를 구해 ‘확률분포’로 나타내야 한다. 그런 다음 그 확률분포대로 단어를 샘플링하면 된다.


확률분포대로 샘플링하므로 말뭉치에서 자주 등장하는 단어는 선택될 가능성이 높지만, 같은 이유로 ‘희소한 단어’는 선택되기가 어렵다.

그런데 word2vec의 Negative Sampling에서는 앞의 확률분포에서 한 가지를 수정할 것을 권고하고 있다. [바로 식 4.4]처럼 기본 확률분포에 0.75를 제곱하는 것이다.

[식 4.4]

여기서 P(w_i)는 i번째 단어의 확률을 뜻한다. [식 4.4]는 단순히 원래 확률분포의 각 요소를 ‘0.75 제곱’할 뿐이다. [식 4.4]처럼 수정하는 이유는, 원래 확률이 낮은 단어의 확률을 살짝 높이기 위해서이다.

실행 결과를 보면 원래 0.7의 확률을 가진 원소는 0.75 제곱 후 0.57로 줄어들고, 원래 0.1의 확률을 가진 원소는 0.13으로 늘어난 것을 볼 수 있다.

UnigramSampler 클래스는 get_negative_sampler(target) 메서드를 제공한다. 이 메서드는 target 인수로 지정한 단어를 긍정적 예로 해석하고, 그 외의 단어 ID를 샘플링한다. 실행결과는 아래와 같다.

class UnigramSampler:
    def __init__(self, corpus, power, sample_size):
        self.sample_size = sample_size
        self.vocab_size = None
        self.word_p = None

        counts = collections.Counter()
        for word_id in corpus:
            counts[word_id] += 1

        vocab_size = len(counts)
        self.vocab_size = vocab_size

        self.word_p = np.zeros(vocab_size)
        for i in range(vocab_size):
            self.word_p[i] = counts[i]

        self.word_p = np.power(self.word_p, power)
        self.word_p /= np.sum(self.word_p)

    def get_negative_sample(self, target):
        batch_size = target.shape[0]

        if not GPU:
            negative_sample = np.zeros((batch_size, self.sample_size), dtype=np.int32)

            for i in range(batch_size):
                p = self.word_p.copy()
                target_idx = target[i]
                p[target_idx] = 0
                p /= p.sum()
                negative_sample[i, :] = np.random.choice(self.vocab_size, size=self.sample_size, replace=False, p=p)
        else:
            # GPU(cupy)로 계산할 때는 속도를 우선한다.
            # 부정적 예에 타깃이 포함될 수 있다.
            negative_sample = np.random.choice(self.vocab_size, size=(batch_size, self.sample_size),
                                               replace=True, p=self.word_p)

        return negative_sample
        
corpus = np.array([0, 1, 2, 3, 4, 1, 2, 3])
power = 0.75
sample_size = 2

sampler = UnigramSampler(corpus, power, sample_size)
target = np.array([1, 3, 0])
negative_sample = sampler.get_negative_sample(target)
print(negative_sample)


결과를 보듯 이제 부정적 예를 샘플링할 수 있게 되었다.


4.2.7 Negative Sampling 구현


NegativeSamplingLoss라는 클래스로 구현한다.

class NegativeSamplingLoss:
    def __init__(self, W, corpus, power=0.75, sample_size=5):
        self.sample_size = sample_size
        self.sampler = UnigramSampler(corpus, power, sample_size)
        self.loss_layers = [SigmoidWithLoss() for _ in range(sample_size + 1)]
        self.embed_dot_layers = [EmbeddingDot(W) for _ in range(sample_size + 1)]

        self.params, self.grads = [], []
        for layer in self.embed_dot_layers:
            self.params += layer.params
            self.grads += layer.grads

    def forward(self, h, target):
        batch_size = target.shape[0]
        negative_sample = self.sampler.get_negative_sample(target)

        # 긍정적 예 순전파
        score = self.embed_dot_layers[0].forward(h, target)
        correct_label = np.ones(batch_size, dtype=np.int32)
        loss = self.loss_layers[0].forward(score, correct_label)

        # 부정적 예 순전파
        negative_label = np.zeros(batch_size, dtype=np.int32)
        for i in range(self.sample_size):
            negative_target = negative_sample[:, i]
            score = self.embed_dot_layers[1 + i].forward(h, negative_target)
            loss += self.loss_layers[1 + i].forward(score, negative_label)

        return loss

    def backward(self, dout=1):
        dh = 0
        for l0, l1 in zip(self.loss_layers, self.embed_dot_layers):
            dscore = l0.backward(dout)
            dh += l1.backward(dscore)

        return dh


초기화 메서드의 인수로는 출력 측 가중치를 나타내는 W, 말뭉치를 뜻하는 corpus, 확률분포에 제곱할 값인 power, 그리고 부정적 예의 샘플링 횟수인 sample_size이다. 여기에서는 앞 절에서 설명한 UnigramSampler 클래스를 생성하여 인스턴스 변수인 sampler로 저장한다. 또한 부정적 예의 샘플링 횟수는 인스턴스 변수인 sample_size에 저장한다.
인스턴스 변수인 loss_layers와 embed_dot_layers에는 원하는 계층을 리스트로 보관한다. loss_layers[0]과 embed_dot_layers[0]은 긍정적 예를 다루는 계층이고, 다음 계층부터는 부정적 예를 다룬다.

forward 메서드에서는 은닉층 뉴런 h와 긍정적 예의 타깃을 뜻하는 target을 인수로 받는다. 이 메서드에서는 우선 self.sampler를 이용해 부정적 예를 샘플링하여 negative_sample에 저장한다. 그런 다음 긍정적 예와 부정적 예 각각의 데이터에 대해서 순전파를 수행해 그 손실들을 더한다. 구체적으로는 Embedding Dot 계층의 forward 점수를 구하고, 이어서 이 점수와 레이블을 Sigmoid with Loss 계층으로 흘려 손실을 구한다.

backward 메서드에서는 순전파 때의 역순으로 각 계층의 backward()를 호출하기만 하면 된다.


4.3 개선판 word2vec 학습


따라서 개선된 CBOW 클래스의 모습은 다음과 같다.

# coding: utf-8
import sys
sys.path.append('..')
from common.np import *  # import numpy as np
from common.layers import Embedding
from ch04.negative_sampling_layer import NegativeSamplingLoss


class CBOW:
    def __init__(self, vocab_size, hidden_size, window_size, corpus):
        V, H = vocab_size, hidden_size

        # 가중치 초기화
        W_in = 0.01 * np.random.randn(V, H).astype('f')
        W_out = 0.01 * np.random.randn(V, H).astype('f')

        # 계층 생성
        self.in_layers = []
        for i in range(2 * window_size):
            layer = Embedding(W_in)  # Embedding 계층 사용
            self.in_layers.append(layer)
        self.ns_loss = NegativeSamplingLoss(W_out, corpus, power=0.75, sample_size=5)

        # 모든 가중치와 기울기를 배열에 모은다.
        layers = self.in_layers + [self.ns_loss]
        self.params, self.grads = [], []
        for layer in layers:
            self.params += layer.params
            self.grads += layer.grads

        # 인스턴스 변수에 단어의 분산 표현을 저장한다.
        self.word_vecs = W_in

    def forward(self, contexts, target):
        h = 0
        for i, layer in enumerate(self.in_layers):
            h += layer.forward(contexts[:, i])
        h *= 1 / len(self.in_layers)
        loss = self.ns_loss.forward(h, target)
        return loss

    def backward(self, dout=1):
        dout = self.ns_loss.backward(dout)
        dout *= 1 / len(self.in_layers)
        for layer in self.in_layers:
            layer.backward(dout)
        return None


기존의 CBOW에서 개선한 점은 Embedding 계층과 Negative Sampling Loss 계층을 적용하는 것이다.

가중치 초기화가 끝나면, 이어서 계층을 생성한다. 여기에서는 Embedding 계층을 2 * window_size개 작성하여 인스턴스 변수인 in_layers에 배열로 보관한다. 그런 다음 Negative Sampling Loss 계층을 생성한다.
계층을 다 생성했으면, 이 신경망에서 사용하는 모든 매개변수와 기울기를 인스턴스 변수인 params와 grads에 모은다. 또한 나중에 단어의 분산 표현에 접근할 수 있도록 인스턴스 변수인 word_vecs에 가중치 W_in을 할당한다.
순전파와 역전파는 기존 CBOW와 마찬가지로 순차적으로 계층들을 호출한다는 점에서 동일하다. 하지만 개선된 CBOW에서는 인수로 받는 맥락과 타깃이 one-hot 벡터가 아니라 단어 ID 자체라는 점에서 차이가 있다.


word2vec으로 얻은 단어의 분산 표현은 비슷한 단어를 가까이 모을 뿐 아니라, 더 복잡한 패턴을 파악하는 것으로 알려져 있다. 대표적인 예가 “king - man + woman = queen"으로 유명한 유추 문제이다. word2vec의 단어의 분산 표현을 사용하면 유추 문제를 벡터의 덧셈과 뺄셈으로 풀 수 있다.

실제로 유추 문제를 풀려면 [그림 4-20]처럼 단어 벡터 공간에서 ”man -> woman" 벡터와 "king -> ?"벡터가 가능한 한 가까워지는 단어를 찾는다. 즉, 수식으로 나타내면 "vec('woman') - vec('man') = vec('?') - vec('king')"라는 벡터에 가장 가까운 단어 벡터를 구하는 일이 된다.

728x90