DCGANでテキスト生成を試してみた

こんにちは、Link-Uの町屋敷です。

今回は生成モデルで有名なGANさんで、Word2Vecを使って英語の文章を生成するものを見つけたので、日本語でもできるのかやってみました。

DCGANのサンプルを動かしてみる

今回はMITライセンスのこのプロジェクトのDCGANをフォークして使っていきます。

まずはそのままのコードを確認していきましょう。

DCGANはGANの一種です。
GAN(Generative Adversarial Networks)は、2つの学習器を互いに争わせることで、学習を進めていきます。2つの学習器とは、
入力された乱数からほしいもの(画像や文章など)を作るジェネレーター(以降G:生成器)と
画像や文章などを入力とし、それがGから生成されたなのかデータセットとして収集してきた本物なのかを見分けるディスクリミネーター(D:識別器)です。

サンプルでは、build_generator()でGの、build_discrimanator内でDの構造が定義されています。

今回はDCGANなので、両方の学習器で畳み込み層を使用しますがプーリング層はありません。

このサンプルでは、コンストラクタで使うネットワークの形と学習方法、損失関数などを定義しています。

Dでは、生成された画像と本物の画像を見分けることを目ざします。生成された画像に0のラベルを、本物の画像に1のラベルをつければ、通常の画像分類と同様なので実装は簡単です。

    # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer2,
            metrics=['accuracy'])

Gもやっていることは簡単で、ノイズを入力して生成された画像をDに入れて、より高い確率で本物の画像たど判別されるように学習します。

プログラムにするとこうです。

何も考えずに作ってしまうと、Gのモデルの中にDが含まれているので、Gを学習するときにDも学習されてしまいます。

そこでGを学習するときにはDの値を変えないようself.discriminator.trainable = Falseで制限をかけています。

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,)) #雑音の形の定義
        img = self.generator(z) #生成された画像がimg

         # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(img) #Validが判定結果

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid) #入力が雑音で、出力が判定結果のモデル
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

あとは、train関数内でデータを入れたら完成です。

実際に走らせてみましょう。

0Epoch

ただの雑音が

200Epoch

4000Epoch

読める数字になっていっています。

文章生成用に改造する

さてここからが本題です。

先程のサンプルでは28*28の画像を入力にしていましたが、代わりに文字を変換して入力します。

そのままの文字を入力してもどうにもならないので、何らかの変換をしないといけないんですが、それが今回使うWord2Vecです。

正解の文章は前にWikipediaから取ってきた漫画の記事(約5000記事)のデータを使いまわします。

Word2Vecはその名の通り単語をベクトルに変換する技術で、これを使うことによって単語と単語の計算ができるようになります。

        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        
        print("Building Word2Vec")
        word_model = Word2Vec(sentences, size=63, min_count=1, window=5) 
        joblib.dump(word_model, '{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))

変換はgensimを使って行います。Word2Vec関数に文章を渡せば辞書が出来ます。。簡単!

先にMecabでの分かち書きを忘れないよう注意。引数のsizeは変換後のベクトルの長さ、windowは。文章中の単語と単語の関係性を計算する距離を表す。この例だと5単語まで。

def VectorizeWord(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
               
        n_words = 8
        # converted_sentences = [] 
        # converted_sentence = np.zeros(word_model.syn0.shape[0])
        input_data = []
        for s in sentences:
            vectorized_sentence = []
            word_count = 0
            for w in s:
                vector = word_model.wv[w]
                vectorized_sentence.append(vector)
                word_count += 1
                if w == '。':
                    if word_count < n_words:
                        for i in range(n_words - word_count):
                            vector = word_model.wv['。']
                            vectorized_sentence.append(np.append(vector, -1))
                            word_count += 1
                    if word_count == n_words:
                        input_data.append(vectorized_sentence)
                        vectorized_sentence = []
                        word_count = 0
                    elif word_count > n_words:
                        vectorized_sentence = []
                        word_count = 0
        print(np.shape(input_data))
        print(n_words * (word_model.layer1_size + 1))
        input_data = np.reshape(input_data, (-1, n_words * (word_model.layer1_size + 1)))  # 品詞IDの分がたされる
        print(np.shape(input_data))
        print(n_words * len(sentences))
        word_model = joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        print(1)
            

作った辞書を使って単語をベクトルに変換していきます。(vector = word_model.wv[w])の部分。

ついでに1文章に含まれる単語数はまちまちなので、ある一定数に揃えます。一定数に満たない文章は空白で埋めたかったんですが、辞書に登録されてない単語は変換するときにエラーになるので「。」で埋めました。「行き,まし,た,。」 => 「行き,まし,た,。,。,。,。,。」

これで準備完了。先程の画像の代わりに変換した単語を入れていきます。

さてここで問題になるのは、どの形で、単語を渡すかです。

一つは、一単語を行ベクトルを列として(単語数*ベクトル数*1)の画像みたいに渡す方法。

ベクトルを画像で言うRGBみたいなものだと考えて(1*単語数*ベクトル数)もあります。

結論から言うと前者はあまりよろしくなかったです。

生成例がこれなんですけど

epoch0

・ メルチェリーダぷれりゅうどぷれりゅうどガンムマーズクロニクル歌野ゃがんはがちりんにとぶアプトアプトばろそぐいアダルトグッズショップばろケルビム・ゲート・キーパーゃがんはがちりんにとぶキザキ売り込もゃがんはがちりんにとぶゃがんはがちりんにとぶオゲ売り込もキザキ売り込も受け流そ歌野歌野ザ・ビーンズ歌野臨もザ・ビーンズ歌野ザ・ビーンズメルチェリーダ弖虎弖虎弖虎弖虎弖虎メルチェリーダアオノキセキウキウキペディアカードウキウキペディアカード

epoch400

・ 使用二鷹野大遠藤広隆小暮昌広程度桁異なり本当に他紙足取り後ろ他付属ボーナストラックトラック柔弱。。。

epoch0でも兆候が見えてるですが、epoch400ではなんか前の方に名詞固まってるし、トラックトラックみたいな繰り返しがよく起こってます。

とくにこの繰り返しが至るところに出現します。多分ゲネレーターの畳み込み層のせいだと思ったので、入力を後者の方に変更。単語数64も多すぎたので8まで減らした。

結果はこちら

epoch 0

・ 若作り繋っシャンツァカッシーギャグイニチアシブノレ混交デイブレイク

・ 年代デイブレイクビジネスインフラ・リミテッドオフィシャルパンフレット商業ドルドルーヴォドラゴン・サーガジャケットイラストギャラリー

・ 交ぜるスパークオンウェイヴオフィシャルパンフレット描くソガシイナ京田辺イリヤルートドラゴンボールメニュー

・ 福見オールドマン・パーサキュライナーツノートぬおイニチアシブゲームエッセイマンガ木城

・ あかしあ台ノスタルジーゅせんきょうジミナニードルアイフレスポ鬩ホラー

epoch 100

・ 選評再現カレイドスター・ウィンディステージ数。。。。

・ ホワイトナックル・クリムゾンオーブハンター両替キャストバトルオブフェアリーテイル選り抜いゲームパッケージイラスト全て。

・ カードダスステーション終盤ぶった斬る以後後半フルタッチ・アクション。。

・ 温州ニコラ・ケフェウス一般に約前後。。。

・ 時点キディ・ガーランドとおり前後予定同権。。

epoch 200

・ キャラウムカフェウルトラジャンプエッグサイト差し置き。日本一が上回り驚愕

・ ワイディーネ学期エロティック・コメディ。メジャーが上回りから

・ キャラウムカフェ学期エロティック・コメディガールズ・デート年代は上回り共に

・ ドルビーステレオコンサンタルト学期エロティック・コメディガールズ・デートアニヴァーサリーが上回り共に

・ キャラウムカフェ学期エロティック・コメディガールズ・デート以後が上回り奪え

さっきよりは多少マシになったが、まだまだ不自然。

epoch200ではモード崩壊気味でなぜか末尾の「。」が消えた。

ここから改善を始めていくが続きは来月に。

まとめ

一応生成は出来たが精度はまだまだ。

改善点は色々あるので来月はそれをやっていこうと思う。

具体的にはパラメーター調整とかMeCabの情報を使ってConditional GAN化してみるとかそもそもDCGANをやめるとかかなあ。

プログラム全文

'''
MIT License

Copyright (c) 2017 Erik Linder-Norén

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''

import re
import json

from timeit import default_timer as timer

import keras.backend as K

import numpy as np
import matplotlib
from matplotlib import pyplot as plt
import joblib

from natto import MeCab
import gensim
from gensim.models.word2vec import Word2Vec

try:
    from keras.engine.topology import Container
except:
    from keras.engine.network import Network as Container
from keras.datasets import mnist
from keras.layers.recurrent import LSTM
from keras.layers.embeddings import Embedding
from keras.models import Model, Sequential
from keras.layers import Dense, Activation, Reshape, UpSampling2D, Conv2D, BatchNormalization, Input, Dropout, ZeroPadding2D, Flatten
from keras.optimizers import Adam
from keras.layers.advanced_activations import LeakyReLU
from keras.losses import binary_crossentropy

TEXT_JSON_DIR = '../WikipediaComic/whole_data'
INFOBOX_JSON_DIR = '.'
INFOBOX_FILE_NAME = 'wiki_infobox_Infobox_animanga_Manga.json'
WRITE_JSON_DIR = '.'
WRITE_JSON_FILE_NAME = 'joined.json' 
WRITE_TEXT_RESULT_DIR = '.'
AUTO_ENCODER_DIR = '.'

WRITE_JOBLIB_DIR = '.'

# Matplotlibの日本語設定
font_path = '/usr/share/fonts/truetype/takao-gothic/TakaoPGothic.ttf'
font_prop = matplotlib.font_manager.FontProperties(fname=font_path)
matplotlib.rcParams['font.family'] = font_prop.get_name()

def cross_entropy_plus_simmilarity(y_true, y_pred, X_pred):
    simi = 0
    for i , v in enumerate(X_pred):
        for j, _ in enumerate(X_pred):
            if i > j:
                simi += K.square(X_pred[i] - X_pred[j])
    n = len(X_pred)
    return simi/(n*(n-1)/2) + binary_crossentropy(y_true, y_pred)

class DCGAN():

    def __init__(self, row, col, additional_featire_count = 0):
        self.additional_feature_count = additional_featire_count
        # Input shape
        self.img_rows = 1
        self.img_cols = col
        self.channels = row + self.additional_feature_count
        
        self.txt_shape = (self.img_cols, 1, self.channels)
        self.latent_dim = 200

        optimizer = Adam(0.0002, 0.5)

        # Build the generator
        self.generator = self.build_generator()
#        self.generator = self.WordGenerator()

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',
            optimizer=optimizer,
            metrics=['accuracy'])

        # The generator takes noise as input and generates imgs
        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)
        
    def build_generator(self):
    
        model = Sequential()
    
        model.add(Dense(128 * int((self.img_cols) * self.img_rows), activation="relu", input_dim=self.latent_dim))
        model.add(Reshape((self.img_cols , self.img_rows, 128)))
        model.add(Conv2D(128, kernel_size=3, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(64, kernel_size=3, strides=1, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(32, kernel_size=3, strides=1, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(self.channels, kernel_size=3, padding="same"))
        model.add(Activation("tanh"))
    
        model.summary()
    
        noise = Input(shape=(self.latent_dim,))
        img = model(noise)
    
        return Model(noise, Container(noise, img)(noise)) 

    def build_discriminator(self):

        model = Sequential()
        print(self.txt_shape)
        model.add(Conv2D(32, kernel_size=3, strides=2, input_shape=self.txt_shape, padding="same"))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Conv2D(64, kernel_size=3, strides=2, padding="same"))
        model.add(ZeroPadding2D(padding=((0, 1), (0, 1))))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.5))
        model.add(Conv2D(128, kernel_size=3, strides=2, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Flatten())
        model.add(Dense(1, activation='sigmoid'))

        model.summary()

        img = Input(shape=self.txt_shape)
        validity = model(img)

        return Model(img, Container(img, validity)(img))

    def train(self, epochs, batch_size=128, save_interval=50):
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        X_train = joblib.load('{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        print(np.shape(X_train))
        X_train = np.reshape(X_train, (-1, self.img_cols , 1, self.channels))
        print(np.shape(X_train))
        # Adversarial ground truths
        real = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):

            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half of images
            idx = np.random.randint(0, X_train.shape[0], batch_size)
            imgs = X_train[idx]

            # Sample noise and generate a batch of new images
            noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
            gen_imgs = self.generator.predict(noise)

            # Train the discriminator (real classified as ones and generated as zeros)
            self.discriminator.trainable = True
            self.generator.non_trainable = False
            d_loss_real = self.discriminator.train_on_batch(imgs, real)
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Train the generator (wants discriminator to mistake images as real)
            self.discriminator.trainable = False
            self.generator.non_trainable = True
            g_loss = self.combined.train_on_batch(noise, real)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100 * d_loss[1], g_loss))

            # If at save interval => save generated image samples
            if epoch % save_interval == 0:
                self.show_text(epoch)
    
    def show_text(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_text = self.generator.predict(noise)
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_text = self.generator.predict(noise)
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        with open(WRITE_TEXT_RESULT_DIR + '/generated_epoch{0}.txt'.format(epoch), "w+") as f:
            f.write('Epoch {0}rn'.format(epoch))
            for vector in gen_text:
                word = ''
                s = vector.flatten().reshape(self.img_cols, self.img_rows, self.channels)
                for w in s:   
                    if self.additional_feature_count > 0:
                        word = word + word_model.most_similar(w[:,0:-1*self.additional_feature_count])[0][0]
                    else:
                        word = word + word_model.most_similar(w)[0][0]
                print(word + 'rn')
                f.write(word)
    
    def save_imgs(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, self.latent_dim))
        gen_imgs = self.generator.predict(noise)

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i, j].imshow(gen_imgs[cnt, :, :, 0], cmap='gray')
                axs[i, j].axis('off')
                cnt += 1
        fig.savefig("images/mnist_%d.png" % epoch)
        plt.close()


class ProcessWord():

    def __init__(self, word_max, vector_size, do_use_pos = True):
        self.mecab = MeCab()
        self.word_max = word_max
        self.vector_size = vector_size
        self.do_use_pos = do_use_pos
        
    def DcganTrigger(self, epochs=4000, batch_size=64, save_interval=10):
        if self.do_use_pos:
            dg = DCGAN(self.vector_size, self.word_max, 1)
        else:
            dg = DCGAN(self.vector_size, self.word_max, 0)
        dg.train(epochs=epochs, batch_size=batch_size, save_interval=save_interval)

    def ExtractWords(self):
        with open(WRITE_JSON_DIR + '/' + WRITE_JSON_FILE_NAME, 'r', encoding='utf_8') as jw: 
            json_data = json.load(jw)
        
        all_words = [[0]] * len(json_data) 
        with MeCab("-Owakati") as mecab:
            for i, j in enumerate(json_data):
                if i % 100 == 0:
                    print(i)
                text = j['text']
                
                all_words[i] = re.sub(r'[!-~]', "", mecab.parse(text))  # 記号、英語除去
                
        joblib.dump(all_words, '{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR), compress=True)
        
    def CreateWord2Vec(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[self.AnalyzeWord(word)[1] for word in document.lower().split()] for document in all_sentences]
        
        print("Building Word2Vec")
        word_model = Word2Vec(sentences, size=self.vector_size, min_count=1, window=5)
        joblib.dump(word_model, '{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        # Code tried to prepare LSTM model for word generation
        
    def VectorizeWord(self):
        all_sentences = joblib.load('{0}/all_sentences.pkl'.format(WRITE_JOBLIB_DIR))
        sentences = [[word for word in document.lower().split()] for document in all_sentences]
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
               
        n_words = self.word_max
        # converted_sentences = [] 
        # converted_sentence = np.zeros(word_model.syn0.shape[0])
        input_data = []
        pos_labels = [] 
        for s in sentences:
            vectorized_sentence = []
            pos = []
            word_count = 0
            #print(s)
            for w in s:
                posid, surface = self.AnalyzeWord(w)
                vector = word_model.wv[surface]
                if self.do_use_pos:
                    vector = np.append(vector, float((posid - 34) / 68))  # POSID正規化
                pos.append(float((posid - 34) / 68))
                vectorized_sentence.append(vector)
                word_count += 1
                if w == '。':
                    if word_count < n_words:
                        for i in range(n_words - word_count):
                            vector = word_model.wv['。']
                            if self.do_use_pos:
                                vectorized_sentence.append(np.append(vector, -1))
                            else:
                                vectorized_sentence.append(vector)
                            pos.append(-1)
                            word_count += 1
                    if word_count == n_words:
                        input_data.append(vectorized_sentence)
                        pos_labels.append(pos)
                        vectorized_sentence = []
                        pos =  []
                        word_count = 0
                    elif word_count > n_words:
                        vectorized_sentence = []
                        pos = []
                        word_count = 0
                        
        print(np.shape(input_data))
        print(n_words * (word_model.layer1_size + 1))
        if self.do_use_pos:
            input_data = np.reshape(input_data, (-1, n_words * (word_model.layer1_size + 1)))  # 品詞IDの分がたされる
        else:
            input_data = np.reshape(np.array(input_data), (-1, n_words * (word_model.layer1_size)))
        print(np.shape(input_data))
        print(n_words * len(sentences))
        word_model = joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        joblib.dump(input_data, '{0}/vectorized_words.pkl'.format(WRITE_JOBLIB_DIR))
        joblib.dump(pos_labels, '{0}/pos_labels.pkl'.format(WRITE_JOBLIB_DIR))
        print(1)
        
    def CheckWord2Vec(self, word1, word2, ope):
        word_model = joblib.load('{0}/word2vec.pkl'.format(WRITE_JOBLIB_DIR))
        if ope == '+':
            w = word_model.wv[word1] + word_model.wv[word2]
        if ope == '-':
            w = word_model.wv[word1] - word_model.wv[word2]
        if ope == 'all' or ope == 'ALL':
            self.CheckWord2Vec(word1, word2, '+')
            self.CheckWord2Vec(word1, word2, '-')
            self.CheckWord2Vec(word2, word1, '-')
            return    
        print(word_model.most_similar([w]))
        
    def AnalyzeWord(self, word):
        gen = self.mecab.parse(word, as_nodes=True)
        for w in gen:
            return w.posid, w.surface

if __name__ == '__main__':
    begin = timer()
    pw = ProcessWord(8, 32, False)    
    #pw.ExtractWords()
    #pw.CreateWord2Vec()    
    pw.VectorizeWord()
    pw.DcganTrigger(4000, 256, 10)
    print(timer() - begin)
    pw.CheckWord2Vec('学院', '悪', 'ALL')

GPUを使って無線LANをクラックする話 / モニター、ロック、スケーラビリティ

弊社は最近、それまでの神谷町(東京タワーの近く)から、神田と秋葉原の間くらいに引っ越しをしました。とは言っても、去年の夏の終わりぐらいなので、だいぶ前なのですが。

オフィスの立地で重要なことって、何でしょう。駅からの距離?ビルの高さ?地名のブランド?歓楽街からの近さ?ランチ営業をやってるレストランの多さ?

みなさま、各々重視する点があると思うのですが、「一番近いコンビニが何か」は意外と見過ごされるところではないでしょうか。朝ちょっと寄ってコーヒーを買ったり、お弁当を買ったり。結構お世話になる所だと思います。

というわけで今月の写真は「セブンイレブンのカフェラテ・マシンの『顔』」です。いままでの所は一番近いコンビニが「セブン・イレブン」だったのが「ファミマ!」になってしまって、この眉毛のかわいい顔が見れなくちゃってさ。ちょっとさびしいよね。

…という世間話(?)をしたら、「へ、へぇ…(困惑)」って言われました。

コミュニケーション、難しいね!

前回までのあらすじ

前回分かってしまったことは、仕事を要求するCPUやGPUが増えれば増えるほど、パスワードの解析を依頼する役割を担うPythonのメイン・スレッドは、ロックの取り合いで時間を使い潰すようになってしまうということでした。ここを改善すれば、CPUやGPUがもっと活用できるかも。

このロックは、実際にはどのように使われているのでしょうか?ちょっとここで、Pyritで仕事が分配される様を眺めてみましょう。

Pyritでの「流れ作業」の様子を見学する

Pyritのベンチマークモードでは、メインのPythonスレッドで候補となるパスワードを生成した後に、それらをCPUやGPUに分配しています。で、並列に計算された結果は再度メインのスレッドでCPyritクラスが受け取る、という流れ作業になっています。その流れを見ていきましょう。

仕事の受付け

CPyrit.enqueueに渡されます。このメソッドは、パスワードを生成するPythonのメインスレッドから呼ばれます:

    def enqueue(self, essid, passwords, block=True):
        with self.cv:
            # ... 省略 ...
            passwordlist = list(passwords)
            if len(self.inqueue) > 0 and self.inqueue[-1][0] == essid:
                self.inqueue[-1][1][self.in_idx] = passwordlist
            else:
                self.inqueue.append((essid, {self.in_idx: passwordlist}))
            self.workunits.append(len(passwordlist))
            self.in_idx += len(passwordlist)
            self.cv.notifyAll()

self.cvはthreading.Conditionで、いわゆるモニターと呼ばれている排他制御のためのしくみ、です。「モニタ」って言葉は改めて聞くことはあんまり無い気がするんですが、Javaでおなじみのwait/notify/notifyAllのアレといえば通じる人もいるかもしれません。なにはともあれ、with self.cv: とすることで、このブロックの中のコードが実行されるスレッドが高々1つになるように制御しています。その後の処理は基本的には仕事をself.inqueueその他に適切にセットしているだけ、です。

適切に情報をセットしたら、self.cv.notifyAll() して他のスレッドにnotify(通知)します。具体的に何が起こるのかというと、with self.cv: の中でself.cv.wait() している他のスレッドが全て起こされます。じゃあwith self.cv: しているのは何箇所かというと、これを含めてなんと7箇所もあります。今回はそのうち、たくさん使われていそうなコードパスを眺めていきます。

仕事の受取り

inqueueに入れられた仕事は、CPUやGPUの各スレッドからCPyrit._gatherというメソッドを呼ぶことで受け取られます:

    def _gather(self, desired_size, block=True, timeout=None):
        t = time.time()
        with self.cv:
            passwords = []
            pwslices = []
            cur_essid = None
            restsize = desired_size
            while True:
                self._check_cores()
                for essid, pwdict in self.inqueue:
                   # ... passwordsを埋め、self.inqueueからそのぶん削除する処理 ...
                if len(passwords) > 0:
                    wu = (cur_essid, tuple(passwords))
                    try:
                        self.slices[wu].append(pwslices)
                    except KeyError:
                        self.slices[wu] = [pwslices]
                    self.cv.notifyAll()
                    return wu
                else:
                    if block:
                        if timeout is not None and time.time() - t > timeout:
                            return None, None
                    else:
                        return None, None
                    self.cv.wait(0.1)

self.inqueueからself.slicesに情報を移しつつ(self.slices[wu] = … )、WorkUnitである(ESSID, passwords)のタプルをCPUやGPUのために返します。この情報を使って、CPUはSSEを、GPUはCUDAやOpenCLを使って結果(results )を計算します。今回は情報のやり取りだけに注目するので、実際にどのように計算をするのかは省略する三分クッキング方式で次に進みましょう。

計算した結果を通知するために使われるのが次の_scatter メソッドです:

結果の受付け

CPUやGPUの各スレッドで計算された結果は、CPyrit._scatterメソッドを使って集められます:

    def _scatter(self, essid, passwords, results):
        assert len(results) == len(passwords)
        with self.cv:
            wu = (essid, passwords)
            slices = self.slices[wu].pop(0)
            if len(self.slices[wu]) == 0:
                del self.slices[wu]
            ptr = 0
            for idx, length in slices:
                self.outqueue[idx] = list(results[ptr:ptr + length])
                ptr += length
            for idx in sorted(self.outqueue.iterkeys(), reverse=True)[1:]:
                res = self.outqueue[idx]
                o_idx = idx + len(res)
                if o_idx in self.outqueue:
                    res.extend(self.outqueue[o_idx])
                    del self.outqueue[o_idx]
            self.cv.notifyAll()

self.slicesの情報を消費・削除しつつ、self.outqueue に結果を載せています。難しいところはないですね。

結果の受取り

最後に、Pythonのメインスレッドが、CPyrit.dequeueメソッドを使ってCPUやGPUの各スレッドがパスワードを受取ります:

    def dequeue(self, block=True, timeout=None):
        t = time.time()
        with self.cv:
            if len(self.workunits) == 0:
                return None
            while True:
                wu_length = self.workunits[0]
                if self.out_idx not in self.outqueue 
                 or len(self.outqueue[self.out_idx]) < wu_length:
                    self._check_cores()
            # ... 十分な数の結果が得られない時に待つ処理 ...
                else:
                    reslist = self.outqueue[self.out_idx]
                    del self.outqueue[self.out_idx]
                    results = reslist[:wu_length]
                    self.out_idx += wu_length
                    self.outqueue[self.out_idx] = reslist[wu_length:]
                    self.workunits.pop(0)
                    self.cv.notifyAll()
                    return tuple(results)

self.outqueue から結果を適切に取り出してself.outqueue から削除して、結果をreturnするだけです。

ここから更にCrackerがクラックに成功したかの判定をするわけですが、benchmarkではその処理は省略されているので、今回もここで止めておきます。

ここまでの観察

self.inqueue とself.outqueue の2本のキューがあることがわかりました。inqueueはパスワードを、outqueueにはパスワードから計算した結果をやり取りするために使われています。

しかし、それらを保護するためのthreading.Conditionは2つ…ではなく、1つのself.cv だけです。つまり、実際にはself.inqueueしかいじっていなくても、self.outqueueのための保護も同時に行っていることになります。…なんだか、無駄な感じがしませんか?それぞれのために2つthreading.Conditionを作って管理できないんでしょうか?

しかしながら、これは言うのは簡単なんですが、実際にやるのは難しそうです。まず、self.inqueueとself.outqueueは実は完全に独立しているわけではありません。_scatterや_gatherでいじられるself.slicesを介して微妙に両者は繋がっています。なので、実はCondition変数を2つそのまま使うだけでは正しくモニターを分割できません。self.slices用の排他制御も作りこめば分割できます。が、そういう事をするとデッドロックが起きがちなので、正直避けたい。

Pythonはシングルスレッドでしか動かないので、それぞれのQueueのためにthreading.Conditionを2つにわけられたとしても、Javaの時と違って2スレッドが並列で動くようになったりはしません。が、同じConditionを取り合うことはなくなるので、性能の向上が見込まれる…かもしれません。ロックは一般にスケーラブルではない(待機するスレッドの数が増えれば性能が悪化する)からです。

今回はプロファイリングの結果から言ってあまり実益は無さそうなのですが、もしConditionが2つに分割できればself.cv.wait() が無駄に起こされる回数が減るのは間違いない、ということはコメントさせてください。上のself.cv.wait() をしているコードを丹念に眺めてもらうと分かるのですが、無駄に起こされる事に備えて、特定の条件が満たされているかチェックして、条件が満たされてなかったら「やっぱ駄目じゃん!」と叫びながらもう一度眠りにつくようになっています(これ自体は一般的な使い方です)。さらに余談ながら、この「無駄に起こされるスレッド」が存在するので、self.cv.notify() ではなく、self.cv.notifyAll() を使わねばなりません。もしself.cv.notify()を使ってしまったら、運悪く無関係なスレッドだけが起きて、もう一度self.cv.wait() で眠りにつき、その後は何も起こらなくなり、永遠に応答しなくなってしまう可能性があります。

待機するのは最大何スレッド?

ついでに、with self.cv:self.cv.wait()で待つことになるスレッドの数を概算しましょう。CPUは48スレッド、GPUは4スレッドと仮定します。

  • enqueue:たかだか1スレッド(Pythonのメインスレッド)
  • _gather:最大52スレッド(CPU48コア + GPU4コア)
  • _scatter:最大52スレッド(CPU48コア + GPU4コア)
  • dequeue:たかだか1スレッド(Pythonのメインスレッド)

各スレッドが同時に2つのメソッドで待機することはないので、全体でもself.cvを取り合うのは最大53スレッドと思って間違いないと思います。

ふーむ。ここまで観察したので、次はwith self.cv:self.cv.wait()で待つ時、なにが行われているのか観察しましょう。このthreading.Conditionのソースは、Python本体にあるのでそちらを参照していきます。

threading.Conditionの中を眺める

with self.cv:って具体的に何をしているの?

この↑文字列を書いた時、実際にはself.cv.__enter__という関数が呼ばれます(これはPythonでのお約束です)。で、実装はこんな感じです

    def __enter__(self):
        return self._lock.__enter__()

self._lockはRLock(再入可能ロック)で、この関数はさらにC言語で実装されておりまして

static PyMethodDef rlock_methods[] = {
    {"acquire",      (PyCFunction)(void(*)(void))rlock_acquire,
     METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
//....
    {"__enter__",    (PyCFunction)(void(*)(void))rlock_acquire,
     METH_VARARGS | METH_KEYWORDS, rlock_acquire_doc},
// ...
};

__enter__はacquireと同じ関数で、rlock_acquireです。

その先の実装は結構複雑そうですので一端置いておきます。pthreadとか適当にwrapしてるだけかと思ったけどそんなことは無い。

self.cv.wait()は実際には何を待っているのか?

with self.cv:の中では必ずself.cv.wait()を呼ぶことで他のスレッドが処理するのを待っています。この処理は何をしているのかというと

    def wait(self, timeout=None):
        # ... 前処理 ...
        waiter = _allocate_lock()
        waiter.acquire()
        self._waiters.append(waiter)
        saved_state = self._release_save()
        gotit = False
        try:    # restore state no matter what (e.g., KeyboardInterrupt)
            if timeout is None:
                waiter.acquire()
                gotit = True
            else:
                if timeout > 0:
                    gotit = waiter.acquire(True, timeout)
                else:
                    gotit = waiter.acquire(False)
            return gotit
        finally:
            self._acquire_restore(saved_state)
            if not gotit:
                try:
                    self._waiters.remove(waiter)
                except ValueError:
                    pass

新しくlockを作り、このロックに対して二度acquireをしています。このロックは再入可能ではないので、この2度めのacquireでスレッドは必ず中断されて、ほかのスレッドが代わりに起こされます。で、その起こされた別のスレッドがnotifyした時にロックがreleaseされて、二度目のロック以降の中断されたプログラムが実行される、という仕組みになっている、と。

notify/notifyAllするときにはロックに対するacquireは行われません。releaseだけ行います。前回ロックのacquireで時間を使い切っていることがわかりましたが、つまり、時間を食ってるのはこのwith self.cv:の中のacquireか、self.cv.wait()の中のacquireのどちらか、ということになります。

ロックに掛かる時間の内訳を見る

条件lockのacquireをした回数(掛かった時間)with self.cv:した回数(掛かった時間)self.cv.wait()を呼び出した回数(掛かった時間)
1cpu/0gpu1963回(0.514秒)204回(0.002秒)51回(69.438秒)
48cpu/0gpu7355回(3.202秒)1781回(1.988秒)756回(66.133秒)
0cpu/4gpu5689回(7.122秒)5645回(6.685秒)8回(0.762回)
48cpu/4gpu5113回(44.287秒)4676回(43.910秒)309回(3.161 秒)

回数がいまいち釣り合わない気がするんですが(0cpu/4gpuのself.cv.wait()の呼び出しがたった「8回」って本当かいな??)、仕事をとりあえば取り合うようになるほどwith self.cv: を実行する回数が増え、実行時間も掛かるようになっていくことが読み取れます。で、このwith self.cv:は、新しく生成したロックに対して獲得(acquire)しようとするwait()の時と違って、1つのself.cv._lock を獲得しようとしています。

これらの観察結果から、次の仮定を立てます:「with self.cv: したときに起こるself.cv._lock の取り合いがボトルネックの1つである」。

ここ、なんとかならないんでしょうか。次回実験してみましょう。

余談ですが、CPUだけで実行している時、どちらもself.cv.waitに60秒以上掛かっていますが、これらは実際には概ねthread.Sleepの実行に費やされています(つまりスレッドがやることなくて完全に遊んでいる)。

今月のまとめ

  • Pyritのパスワード候補と結果のリスト(inqueue/outqueue)は、threading.Condition(モニター)を使って排他制御されている
  • モニターは複数のロックを使って制御されている:
    • モニター自体のロック(cv._lock)
    • 待機しているスレッドを起こすためのロック(cv.wait()の中で生成されるwaiterという名前のロック)
  • プロファイリングの結果、モニター自体のロックを獲得しようとするスレッドが増えた結果として時間を使い潰していそう
    • このモニターでの排他制御を改善したら全体の性能も改善してくれないかなぁ

明日から東京では花粉が飛散しはじめるそうです。花粉症の方も、まだそうでない方も、みなさまお気をつけて!

GPUを使って無線LANをクラックする話 Pythonのプロファイルを取る回

近所を散歩していたら、もう梅が咲いていました。今日は立春だそうです。早いなー。

わたしは梅のつぼみが大好きなので、これからはしばらく目が離せない日々が続きそうです。

前回のおさらい

前回得た結果は、「CPUとGPUを同時に使うより、GPUだけを使う方が実は倍ぐらい速い!」という、これまた逆説的な結果でした。

未改造のPyritでは「CPUとGPUを同時」に使うか、「CPUだけ」を使うかなので、なまじ「同時」が「CPUだけ」よりは速いが故に、そんな単純な事にも気づかなかった、と。

うーん…なんでこんな事に…。GPUを追加で使うぶんだけ、せめてちょびっとでも性能が向上してほしかったんですが…。

Pythonのコードのプロファイルを取ろう

これまでにCUDAコードのプロファイルを二回ほど取りました(一回目二回目)が、今日はPythonのコードのプロファイルを取りましょう。もちろんPythonはCPUで実行されるわけですが、さらにPythonは高々1つのスレッドでしか同時に動作しないのでした。

今回Pythonのプロファイルを取ろうと画策するのは、次の観察からです:

  • 2CPU、48コアを全部ぶん回すと48倍の性能がでない。htopを観察する限り、かといってCPUを全部使っているわけでもなさそう
    • Pythonの1コアが各コアに仕事を分配しきれていないと考えると説明できます(わかんないけどね)
  • GPUだけを使うにしても、GPUは使い切っていなさそう
    • ここがよくわからない。GPUが消費しきる分のパスワードは生成できそうなので。
  • ダミーパスワードの生成に何割くらい掛けているのかが気になる

というわけでモリモリ取っていきまっしょい。プロファイルのためのツールは公式で提供されていて、cProfileというのを使えば良さそうです。

% python -m cProfile -s tottime <実行ファイル名> <args...>

とやると、関数の内部の実行に掛かった順番でソートされた結果が帰ってきます。tottimeは、関数の中で他の関数を呼び出した時間は除いた、その関数の純粋な実行時間。cumtimeと入れると、その関数から呼び出した関数の時間も含まれます。

例えば、CPUを1コアだけ使った時の表示はこんな感じになります:

(venv) server01@server01:~/src/Pyrit$ time python2.7 -m cProfile -s tottime ./pyrit benchmark
Pyrit 0.5.1 (C) 2008-2011 Lukas Lueg - 2015 John Mora
https://github.com/JPaulMora/Pyrit
This code is distributed under the GNU General Public License v3+

Running benchmark (985.4 PMKs/s)... -  

Computed 985.36 PMKs/s total.
#1: 'CPU-Core (SSE2/AES)': 1045.2 PMKs/s (RTT 2.8)
         79276 function calls (78919 primitive calls) in 69.771 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1604   68.890    0.043   68.890    0.043 {time.sleep}
     1963    0.514    0.000    0.514    0.000 {method 'acquire' of 'thread.lock' objects}
        1    0.158    0.158    0.197    0.197 cpyrit.py:29(<module>)
        1    0.094    0.094   69.571   69.571 pyrit_cli.py:1184(benchmark)
       51    0.028    0.001   69.438    1.362 threading.py:309(wait)
    54392    0.011    0.000    0.011    0.000 {method 'random' of '_random.Random' objects}

...(略)...

real	1m9.902s
user	1m9.087s
sys	0m0.302s

ログの一番上の行を見ることで、time.sleepで68.9秒とたくさん時間を使っていること、次に使っているのがacquireメソッドであること、がわかります。

さらに、

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.094    0.094   69.571   69.571 pyrit_cli.py:1184(benchmark)
real	1m9.902s

この二行を見ると、このベンチマークを実際に実行するbenchmark関数が69.5秒使っていること、処理時間が69.9秒で、その前後の初期化などに0.4秒ぐらい使われていること(まぁ、予想通りですよね)、などがわかりますよー、と。

この2つの情報を合計すると、CPUを1コアだけ使った時は、69.5秒中68.9秒、ほとんどtime.sleepで時間を潰していること(1コアのCPUが処理するのを待っていること)がなんとなくわかります。

以下前回みたく条件を変えて実行してみましたが、だらだらログを並べても中々わかりにくいので、表にしてみました:

条件1番時間を使っている関数
(全処理に占める割合)
2番時間3番時間4番時間
1cpu/0gpu{time.sleep}
(99.0%)
{method ‘acquire’ of ‘thread.lock’ objects}
(0.7%)
cpyrit.py(<module>)
(0.2%)
pyrit_cli.py(benchmark)
(0.1%)
48cpu/0gpu{time.sleep}
(92.0%)
{method ‘acquire’ of ‘thread.lock’ objects}
(4.5%)
pyrit_cli.py(benchmark)
(2.21%)
cpyrit.py(<module>)
(0.22%)
0cpu/4gpupyrit_cli.py(benchmark)
(61.3%)
{method ‘flush’ of ‘file’ objects}
(14.6%)
{method ‘acquire’ of ‘thread.lock’ objects}
(12.4%)
cpyrit.py(dequeue)
(4.04%)
48cpu/4gpu{method ‘acquire’ of ‘thread.lock’ objects}
(65.2%)
pyrit_cli.py:1184(benchmark)
(16.5%)
cpyrit.py(dequeue)
(5.0%)
{time.sleep}
(3.8%)

だいたい上位に並ぶのはどれも同じ顔ぶればかりなのですが、下のほうに行けばいくほど、つまり、たくさんのCPUやGPUが仕事を要求するようになればなるほど、sleepよりも{method ‘acquire’ of ‘thread.lock’ objects}の処理時間がガンガン増えていく事がわかります。一番下の一番処理を取り合っているところではなんと6割もロックにつぎ込んでいます。これじゃあ、GPUのパスワードのクラックではなく、ロックを取り合うプログラムを実行していたと言っても過言ではありませんな。

そしてdequeue関数の処理内容が増えていくのも気になりますねぇ。この関数は、生成したパスワードをCPUやGPUが受け取るためのもので、threading.Conditionを使ってマルチスレッドの調停を行っております。

この2つから予想されることは、…おそらくこのthreading.Conditionオブジェクトの中にあるロックを取り合ってるんでしょうね…。

あまりにも闇が深そうなので今日はこのぐらいにしておきましょう。

GPUだけを使っているケース(三行目)については追加でコメントさせてください。benchmark関数が一番時間を使っていることから、一生懸命処理するためのパスワードを用意していることはなんとなく察されるのですが、パスワードの生成に使っているはずのrandom関数がリストアップされてこなくて、そのかわりにfile.flushが二番目に食い込んでいます。このflushはCUDA関係っぽい気がしますが、謎です。もうちょっと追いかける価値があると思います。

まとめ

  • 仕事を要求するスレッドが増えれば増えるほど、仕事を用意するスレッドはsleepではなくlockの取り合いで時間を潰すようになる
    • 1つのロックを取り合ってそう
  • GPUだけで処理しているときはパスワードの生成を頑張るみたいだが、実態は要調査
  • 外へ出ろ、梅の花を見ろ!

今月は低電力モードとなっております。寒いからね、仕方ないね(ごめんなさい)。