DCGANでテキスト生成を試してみた

こんにちは、Link-Uの町屋敷です。

今回は生成モデルで有名なGANさんで、Word2Vecを使って英語の文章を生成するものを見つけたので、日本語でもできるのかやってみました。

DCGANのサンプルを動かしてみる

今回はMITライセンスのこのプロジェクトのDCGANをフォークして使っていきます。

まずはそのままのコードを確認していきましょう。

DCGANはGANの一種です。
GAN(Generative Adversarial Networks)は、2つの学習器を互いに争わせることで、学習を進めていきます。2つの学習器とは、
入力された乱数からほしいもの(画像や文章など)を作るジェネレーター(以降G:生成器)と
画像や文章などを入力とし、それがGから生成されたなのかデータセットとして収集してきた本物なのかを見分けるディスクリミネーター(D:識別器)です。

サンプルでは、build_generator()でGの、build_discrimanator内でDの構造が定義されています。

今回はDCGANなので、両方の学習器で畳み込み層を使用しますがプーリング層はありません。

このサンプルでは、コンストラクタで使うネットワークの形と学習方法、損失関数などを定義しています。

Dでは、生成された画像と本物の画像を見分けることを目ざします。生成された画像に0のラベルを、本物の画像に1のラベルをつければ、通常の画像分類と同様なので実装は簡単です。

    # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer2,
            metrics=['accuracy'])

Gもやっていることは簡単で、ノイズを入力して生成された画像をDに入れて、より高い確率で本物の画像たど判別されるように学習します。

プログラムにするとこうです。

何も考えずに作ってしまうと、Gのモデルの中にDが含まれているので、Gを学習するときにDも学習されてしまいます。

そこでGを学習するときにはDの値を変えないようself.discriminator.trainable = Falseで制限をかけています。

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,)) #雑音の形の定義
        img = self.generator(z) #生成された画像がimg

         # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(img) #Validが判定結果

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid) #入力が雑音で、出力が判定結果のモデル
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

あとは、train関数内でデータを入れたら完成です。

実際に走らせてみましょう。

0Epoch

ただの雑音が

200Epoch

4000Epoch

読める数字になっていっています。

文章生成用に改造する

さてここからが本題です。

先程のサンプルでは28*28の画像を入力にしていましたが、代わりに文字を変換して入力します。

そのままの文字を入力してもどうにもならないので、何らかの変換をしないといけないんですが、それが今回使うWord2Vecです。

正解の文章は前にWikipediaから取ってきた漫画の記事(約5000記事)のデータを使いまわします。

Word2Vecはその名の通り単語をベクトルに変換する技術で、これを使うことによって単語と単語の計算ができるようになります。

        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        
        print("Building Word2Vec")
        word_model = Word2Vec(sentences, size=63, min_count=1, window=5) 
        joblib.dump(word_model, '{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))

変換はgensimを使って行います。Word2Vec関数に文章を渡せば辞書が出来ます。。簡単!

先にMecabでの分かち書きを忘れないよう注意。引数のsizeは変換後のベクトルの長さ、windowは。文章中の単語と単語の関係性を計算する距離を表す。この例だと5単語まで。

def VectorizeWord(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
               
        n_words = 8
        # converted_sentences = [] 
        # converted_sentence = np.zeros(word_model.syn0.shape[0])
        input_data = []
        for s in sentences:
            vectorized_sentence = []
            word_count = 0
            for w in s:
                vector = word_model.wv[w]
                vectorized_sentence.append(vector)
                word_count += 1
                if w == '。':
                    if word_count < n_words:
                        for i in range(n_words - word_count):
                            vector = word_model.wv['。']
                            vectorized_sentence.append(np.append(vector, -1))
                            word_count += 1
                    if word_count == n_words:
                        input_data.append(vectorized_sentence)
                        vectorized_sentence = []
                        word_count = 0
                    elif word_count > n_words:
                        vectorized_sentence = []
                        word_count = 0
        print(np.shape(input_data))
        print(n_words * (word_model.layer1_size + 1))
        input_data = np.reshape(input_data, (-1, n_words * (word_model.layer1_size + 1)))  # 品詞IDの分がたされる
        print(np.shape(input_data))
        print(n_words * len(sentences))
        word_model = joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        print(1)
            

作った辞書を使って単語をベクトルに変換していきます。(vector = word_model.wv[w])の部分。

ついでに1文章に含まれる単語数はまちまちなので、ある一定数に揃えます。一定数に満たない文章は空白で埋めたかったんですが、辞書に登録されてない単語は変換するときにエラーになるので「。」で埋めました。「行き,まし,た,。」 => 「行き,まし,た,。,。,。,。,。」

これで準備完了。先程の画像の代わりに変換した単語を入れていきます。

さてここで問題になるのは、どの形で、単語を渡すかです。

一つは、一単語を行ベクトルを列として(単語数*ベクトル数*1)の画像みたいに渡す方法。

ベクトルを画像で言うRGBみたいなものだと考えて(1*単語数*ベクトル数)もあります。

結論から言うと前者はあまりよろしくなかったです。

生成例がこれなんですけど

epoch0

・ メルチェリーダぷれりゅうどぷれりゅうどガンムマーズクロニクル歌野ゃがんはがちりんにとぶアプトアプトばろそぐいアダルトグッズショップばろケルビム・ゲート・キーパーゃがんはがちりんにとぶキザキ売り込もゃがんはがちりんにとぶゃがんはがちりんにとぶオゲ売り込もキザキ売り込も受け流そ歌野歌野ザ・ビーンズ歌野臨もザ・ビーンズ歌野ザ・ビーンズメルチェリーダ弖虎弖虎弖虎弖虎弖虎メルチェリーダアオノキセキウキウキペディアカードウキウキペディアカード

epoch400

・ 使用二鷹野大遠藤広隆小暮昌広程度桁異なり本当に他紙足取り後ろ他付属ボーナストラックトラック柔弱。。。

epoch0でも兆候が見えてるですが、epoch400ではなんか前の方に名詞固まってるし、トラックトラックみたいな繰り返しがよく起こってます。

とくにこの繰り返しが至るところに出現します。多分ゲネレーターの畳み込み層のせいだと思ったので、入力を後者の方に変更。単語数64も多すぎたので8まで減らした。

結果はこちら

epoch 0

・ 若作り繋っシャンツァカッシーギャグイニチアシブノレ混交デイブレイク

・ 年代デイブレイクビジネスインフラ・リミテッドオフィシャルパンフレット商業ドルドルーヴォドラゴン・サーガジャケットイラストギャラリー

・ 交ぜるスパークオンウェイヴオフィシャルパンフレット描くソガシイナ京田辺イリヤルートドラゴンボールメニュー

・ 福見オールドマン・パーサキュライナーツノートぬおイニチアシブゲームエッセイマンガ木城

・ あかしあ台ノスタルジーゅせんきょうジミナニードルアイフレスポ鬩ホラー

epoch 100

・ 選評再現カレイドスター・ウィンディステージ数。。。。

・ ホワイトナックル・クリムゾンオーブハンター両替キャストバトルオブフェアリーテイル選り抜いゲームパッケージイラスト全て。

・ カードダスステーション終盤ぶった斬る以後後半フルタッチ・アクション。。

・ 温州ニコラ・ケフェウス一般に約前後。。。

・ 時点キディ・ガーランドとおり前後予定同権。。

epoch 200

・ キャラウムカフェウルトラジャンプエッグサイト差し置き。日本一が上回り驚愕

・ ワイディーネ学期エロティック・コメディ。メジャーが上回りから

・ キャラウムカフェ学期エロティック・コメディガールズ・デート年代は上回り共に

・ ドルビーステレオコンサンタルト学期エロティック・コメディガールズ・デートアニヴァーサリーが上回り共に

・ キャラウムカフェ学期エロティック・コメディガールズ・デート以後が上回り奪え

さっきよりは多少マシになったが、まだまだ不自然。

epoch200ではモード崩壊気味でなぜか末尾の「。」が消えた。

ここから改善を始めていくが続きは来月に。

まとめ

一応生成は出来たが精度はまだまだ。

改善点は色々あるので来月はそれをやっていこうと思う。

具体的にはパラメーター調整とかMeCabの情報を使ってConditional GAN化してみるとかそもそもDCGANをやめるとかかなあ。

プログラム全文

'''
MIT License

Copyright (c) 2017 Erik Linder-Norén

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

import re
import json

from timeit import default_timer as timer

import keras.backend as K

import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import joblib

from natto import MeCab
import gensim
from gensim.models.word2vec import Word2Vec

try:
    from keras.engine.topology import Container
except:
    from keras.engine.network import Network as Container
from keras.datasets import mnist
from keras.layers.recurrent import LSTM
from keras.layers.embeddings import Embedding
from keras.models import Model, Sequential
from keras.layers import Dense, Activation, Reshape, UpSampling2D, Conv2D, BatchNormalization, Input, Dropout, ZeroPadding2D, Flatten
from keras.optimizers import Adam
from keras.layers.advanced_activations import LeakyReLU
from keras.losses import binary_crossentropy

TEXT_JSON_DIR = '../WikipediaComic/whole_data'
INFOBOX_JSON_DIR = '.'
INFOBOX_FILE_NAME = 'wiki_infobox_Infobox_animanga_Manga.json'
WRITE_JSON_DIR = '.'
WRITE_JSON_FILE_NAME = 'joined.json' 
WRITE_TEXT_RESULT_DIR = '.'
AUTO_ENCODER_DIR = '.'

WRITE_JOBLIB_DIR = '.'

# Matplotlibの日本語設定
font_path = '/usr/share/fonts/truetype/takao-gothic/TakaoPGothic.ttf'
font_prop = matplotlib.font_manager.FontProperties(fname=font_path)
matplotlib.rcParams['font.family'] = font_prop.get_name()

def cross_entropy_plus_simmilarity(y_true, y_pred, X_pred):
    simi = 0
    for i , v in enumerate(X_pred):
        for j, _ in enumerate(X_pred):
            if i > j:
                simi += K.square(X_pred[i] - X_pred[j])
    n = len(X_pred)
    return simi/(n*(n-1)/2) + binary_crossentropy(y_true, y_pred)

class DCGAN():

    def __init__(self, row, col, additional_featire_count = 0):
        self.additional_feature_count = additional_featire_count
        # Input shape
        self.img_rows = 1
        self.img_cols = col
        self.channels = row + self.additional_feature_count
        
        self.txt_shape = (self.img_cols, 1, self.channels)
        self.latent_dim = 200

        optimizer = Adam(0.0002, 0.5)

        # Build the generator
        self.generator = self.build_generator()
#        self.generator = self.WordGenerator()

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
        
    def build_generator(self):
    
        model = Sequential()
    
        model.add(Dense(128 * int((self.img_cols) * self.img_rows), activation="relu", input_dim=self.latent_dim))
        model.add(Reshape((self.img_cols , self.img_rows, 128)))
        model.add(Conv2D(128, kernel_size=3, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(64, kernel_size=3, strides=1, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(32, kernel_size=3, strides=1, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(self.channels, kernel_size=3, padding="same"))
        model.add(Activation("tanh"))
    
        model.summary()
    
        noise = Input(shape=(self.latent_dim,))
        img = model(noise)
    
        return Model(noise, Container(noise, img)(noise)) 

    def build_discriminator(self):

        model = Sequential()
        print(self.txt_shape)
        model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=self.txt_shape, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
        model.add(ZeroPadding2D(padding=((0, 1), (0, 1))))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.5))
        model.add(Conv2D(128, kernel_size=3, strides=2, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        model.summary()

        img = Input(shape=self.txt_shape)
        validity = model(img)

        return Model(img, Container(img, validity)(img))

    def train(self, epochs, batch_size=128, save_interval=50):
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        X_train = joblib.load('{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        print(np.shape(X_train))
        X_train = np.reshape(X_train, (-1, self.img_cols , 1, self.channels))
        print(np.shape(X_train))
        # Adversarial ground truths
        real = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[idx]

            # Sample noise and generate a batch of new images
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            gen_imgs = self.generator.predict(noise)

            # Train the discriminator (real classified as ones and generated as zeros)
            self.discriminator.trainable = True
            self.generator.non_trainable = False
            d_loss_real = self.discriminator.train_on_batch(imgs, real)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Train the generator (wants discriminator to mistake images as real)
            self.discriminator.trainable = False
            self.generator.non_trainable = True
            g_loss = self.combined.train_on_batch(noise, real)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))

            # If at save interval => save generated image samples
            if epoch % save_interval == 0:
                self.show_text(epoch)
    
    def show_text(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_text = self.generator.predict(noise)
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_text = self.generator.predict(noise)
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        with open(WRITE_TEXT_RESULT_DIR + '/generated_epoch{0}.txt'.format(epoch), "w+") as f:
            f.write('Epoch {0}rn'.format(epoch))
            for vector in gen_text:
                word = ''
                s = vector.flatten().reshape(self.img_cols, self.img_rows, self.channels)
                for w in s:   
                    if self.additional_feature_count > 0:
                        word = word + word_model.most_similar(w[:,0:-1*self.additional_feature_count])[0][0]
                    else:
                        word = word + word_model.most_similar(w)[0][0]
                print(word + 'rn')
                f.write(word)
    
    def save_imgs(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_imgs = self.generator.predict(noise)

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i, j].imshow(gen_imgs[cnt, :, :, 0], cmap='gray')
                axs[i, j].axis('off')
                cnt += 1
        fig.savefig("images/mnist_%d.png" % epoch)
        plt.close()


class ProcessWord():

    def __init__(self, word_max, vector_size, do_use_pos = True):
        self.mecab = MeCab()
        self.word_max = word_max
        self.vector_size = vector_size
        self.do_use_pos = do_use_pos
        
    def DcganTrigger(self, epochs=4000, batch_size=64, save_interval=10):
        if self.do_use_pos:
            dg = DCGAN(self.vector_size, self.word_max, 1)
        else:
            dg = DCGAN(self.vector_size, self.word_max, 0)
        dg.train(epochs=epochs, batch_size=batch_size, save_interval=save_interval)

    def ExtractWords(self):
        with open(WRITE_JSON_DIR + '/' + WRITE_JSON_FILE_NAME, 'r', encoding='utf_8') as jw: 
            json_data = json.load(jw)
        
        all_words = [[0]] * len(json_data) 
        with MeCab("-Owakati") as mecab:
            for i, j in enumerate(json_data):
                if i % 100 == 0:
                    print(i)
                text = j['text']
                
                all_words[i] = re.sub(r'[!-~]', "", mecab.parse(text))  # 記号、英語除去
                
        joblib.dump(all_words, '{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR), compress=True)
        
    def CreateWord2Vec(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[self.AnalyzeWord(word)[1] for word in document.lower().split()] for document in all_sentences]
        
        print("Building Word2Vec")
        word_model = Word2Vec(sentences, size=self.vector_size, min_count=1, window=5)
        joblib.dump(word_model, '{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        # Code tried to prepare LSTM model for word generation
        
    def VectorizeWord(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
               
        n_words = self.word_max
        # converted_sentences = [] 
        # converted_sentence = np.zeros(word_model.syn0.shape[0])
        input_data = []
        pos_labels = [] 
        for s in sentences:
            vectorized_sentence = []
            pos = []
            word_count = 0
            #print(s)
            for w in s:
                posid, surface = self.AnalyzeWord(w)
                vector = word_model.wv[surface]
                if self.do_use_pos:
                    vector = np.append(vector, float((posid - 34) / 68))  # POSID正規化
                pos.append(float((posid - 34) / 68))
                vectorized_sentence.append(vector)
                word_count += 1
                if w == '。':
                    if word_count < n_words:
                        for i in range(n_words - word_count):
                            vector = word_model.wv['。']
                            if self.do_use_pos:
                                vectorized_sentence.append(np.append(vector, -1))
                            else:
                                vectorized_sentence.append(vector)
                            pos.append(-1)
                            word_count += 1
                    if word_count == n_words:
                        input_data.append(vectorized_sentence)
                        pos_labels.append(pos)
                        vectorized_sentence = []
                        pos =  []
                        word_count = 0
                    elif word_count > n_words:
                        vectorized_sentence = []
                        pos = []
                        word_count = 0
                        
        print(np.shape(input_data))
        print(n_words * (word_model.layer1_size + 1))
        if self.do_use_pos:
            input_data = np.reshape(input_data, (-1, n_words * (word_model.layer1_size + 1)))  # 品詞IDの分がたされる
        else:
            input_data = np.reshape(np.array(input_data), (-1, n_words * (word_model.layer1_size)))
        print(np.shape(input_data))
        print(n_words * len(sentences))
        word_model = joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        joblib.dump(pos_labels, '{0}/pos_labels.pkl'.format(WRITE_JOBLIB_DIR))
        print(1)
        
    def CheckWord2Vec(self, word1, word2, ope):
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        if ope == '+':
            w = word_model.wv[word1] + word_model.wv[word2]
        if ope == '-':
            w = word_model.wv[word1] - word_model.wv[word2]
        if ope == 'all' or ope == 'ALL':
            self.CheckWord2Vec(word1, word2, '+')
            self.CheckWord2Vec(word1, word2, '-')
            self.CheckWord2Vec(word2, word1, '-')
            return    
        print(word_model.most_similar([w]))
        
    def AnalyzeWord(self, word):
        gen = self.mecab.parse(word, as_nodes=True)
        for w in gen:
            return w.posid, w.surface

if __name__ == '__main__':
    begin = timer()
    pw = ProcessWord(8, 32, False)    
    #pw.ExtractWords()
    #pw.CreateWord2Vec()    
    pw.VectorizeWord()
    pw.DcganTrigger(4000, 256, 10)
    print(timer() - begin)
    pw.CheckWord2Vec('学院', '悪', 'ALL')