遺伝的アルゴリズムによるナップサック問題の最適化

以前遺伝的アルゴリズムの入門的なOneMax問題を解いた。

testpy.hatenablog.com

今回は、これよりも少し複雑なナップサック問題を解く。

ナップサック問題とは

Wikipedia によると、ナップサック問題とは次のような問題である。

ナップサック問題は、計算複雑性理論における計算の難しさの議論の対象となる問題の一つで、 「容量 C のナップサックが一つと、n 種類の品物(各々、価値 pi, 容積 ci)が与えられたとき、 ナップサックの容量 Cを超えない範囲でいくつかの品物をナップサックに詰め、 ナップサックに入れた品物の価値の和を最大化するにはどの品物を選べばよいか」という整数計画問題である。 同じ種類の品物を1つまでしか入れられない場合(xi ∈ {0, 1})や、 同じ品物をいくつでも入れてよい場合(xi ∈ 0以上の整数)など、いくつかのバリエーションが存在する。

今回は容量Cは無視する。 いくらでも品物を詰め込めるが、価値を最大化しつつ、容積ではなく重さを最小化する。 また同じ品物をいくつでも入れて良いことにする。

個体生成、評価、エリート選出

品物が価値と重さという2つの変数を持つ。 まずはこれらを20個生成する。

# input
for i in xrange(N_ITEMS):
    self.items[i] = (random.randint(0, 100), random.randint(1, 10))  # value, weight
# output
{0: (90, 5), 1: (18, 5), 2: (25, 7), 3: (12, 7), 4: (52, 2), 5: (77, 10), 6: (72, 3), 7: (2, 10), 8: (89, 6), 9: (36, 10), 10: (22, 9), 11: (47, 6), 12: (45, 3), 13: (65, 6), 14: (25, 6), 15: (71, 5), 16: (81, 9), 17: (11, 6), 18: (90, 10), 19: (32, 5)}

20個の品物の中から5つずつランダムに選んだセットを20個用意する。

# input
pop = []
for i in range(N_POP):
    ind = [self.items[k] for k in random.sample(range(N_ITEMS), 5)]
    pop.append(ind)
# output
[(25, 7), (11, 6), (2, 10), (18, 5), (77, 10)]
[(22, 9), (65, 6), (77, 10), (89, 6), (81, 9)]
[(72, 3), (81, 9), (52, 2), (25, 6), (18, 5)]
[(36, 10), (45, 3), (81, 9), (90, 5), (90, 10)]
[(25, 6), (81, 9), (36, 10), (89, 6), (77, 10)]
[(65, 6), (72, 3), (90, 5), (25, 6), (12, 7)]
[(18, 5), (2, 10), (71, 5), (11, 6), (36, 10)]
[(77, 10), (12, 7), (25, 6), (47, 6), (71, 5)]
[(89, 6), (47, 6), (25, 7), (65, 6), (25, 6)]
[(52, 2), (25, 6), (18, 5), (36, 10), (89, 6)]
[(71, 5), (65, 6), (36, 10), (25, 6), (32, 5)]
[(2, 10), (32, 5), (25, 6), (81, 9), (71, 5)]
[(12, 7), (36, 10), (71, 5), (22, 9), (81, 9)]
[(36, 10), (18, 5), (12, 7), (90, 10), (72, 3)]
[(77, 10), (22, 9), (90, 5), (25, 6), (11, 6)]
[(22, 9), (89, 6), (71, 5), (72, 3), (65, 6)]
[(77, 10), (32, 5), (52, 2), (89, 6), (45, 3)]
[(47, 6), (2, 10), (18, 5), (25, 6), (89, 6)]
[(65, 6), (2, 10), (89, 6), (52, 2), (45, 3)]
[(36, 10), (11, 6), (2, 10), (89, 6), (65, 6)]

これらの個体として、各々評価を行う。 と言っても、価格と重さを各々足し合わせるだけ。

# input
def clac_score(self, indivisual):
    dic = {}
    dic['score0'] = 0  # value
    dic['score1'] = 0  # weight
    for ind in indivisual:
        dic['score0'] += ind[0]
        dic['score1'] += ind[1]
# output
{'score0': 133, 'score1': 38, 'param': [(25, 7), (11, 6), (2, 10), (18, 5), (77, 10)]}
{'score0': 334, 'score1': 40, 'param': [(22, 9), (65, 6), (77, 10), (89, 6), (81, 9)]}
{'score0': 248, 'score1': 25, 'param': [(72, 3), (81, 9), (52, 2), (25, 6), (18, 5)]}
{'score0': 342, 'score1': 37, 'param': [(36, 10), (45, 3), (81, 9), (90, 5), (90, 10)]}
{'score0': 308, 'score1': 41, 'param': [(25, 6), (81, 9), (36, 10), (89, 6), (77, 10)]}
{'score0': 264, 'score1': 27, 'param': [(65, 6), (72, 3), (90, 5), (25, 6), (12, 7)]}
{'score0': 138, 'score1': 36, 'param': [(18, 5), (2, 10), (71, 5), (11, 6), (36, 10)]}
{'score0': 232, 'score1': 34, 'param': [(77, 10), (12, 7), (25, 6), (47, 6), (71, 5)]}
{'score0': 251, 'score1': 31, 'param': [(89, 6), (47, 6), (25, 7), (65, 6), (25, 6)]}
{'score0': 220, 'score1': 29, 'param': [(52, 2), (25, 6), (18, 5), (36, 10), (89, 6)]}
{'score0': 229, 'score1': 32, 'param': [(71, 5), (65, 6), (36, 10), (25, 6), (32, 5)]}
{'score0': 211, 'score1': 35, 'param': [(2, 10), (32, 5), (25, 6), (81, 9), (71, 5)]}
{'score0': 222, 'score1': 40, 'param': [(12, 7), (36, 10), (71, 5), (22, 9), (81, 9)]}
{'score0': 228, 'score1': 35, 'param': [(36, 10), (18, 5), (12, 7), (90, 10), (72, 3)]}
{'score0': 225, 'score1': 36, 'param': [(77, 10), (22, 9), (90, 5), (25, 6), (11, 6)]}
{'score0': 319, 'score1': 29, 'param': [(22, 9), (89, 6), (71, 5), (72, 3), (65, 6)]}
{'score0': 295, 'score1': 26, 'param': [(77, 10), (32, 5), (52, 2), (89, 6), (45, 3)]}
{'score0': 181, 'score1': 33, 'param': [(47, 6), (2, 10), (18, 5), (25, 6), (89, 6)]}
{'score0': 253, 'score1': 27, 'param': [(65, 6), (2, 10), (89, 6), (52, 2), (45, 3)]}
{'score0': 203, 'score1': 38, 'param': [(36, 10), (11, 6), (2, 10), (89, 6), (65, 6)]}

価格を最大化して、重さを最小化するには、 価格を降順、重さを昇順にソートする。 pure pythonだとソートコードが複雑になりそうだったので、Pandasを利用した。

# input
df = pd.DataFrame(fitness)
df = df.sort(['score0', 'score1'], ascending=[False, True])
# output
                                              param  score0  score1
3   [(36, 10), (45, 3), (81, 9), (90, 5), (90, 10)]     342      37
1    [(22, 9), (65, 6), (77, 10), (89, 6), (81, 9)]     334      40
15    [(22, 9), (89, 6), (71, 5), (72, 3), (65, 6)]     319      29
4   [(25, 6), (81, 9), (36, 10), (89, 6), (77, 10)]     308      41
16   [(77, 10), (32, 5), (52, 2), (89, 6), (45, 3)]     295      26
5     [(65, 6), (72, 3), (90, 5), (25, 6), (12, 7)]     264      27
18    [(65, 6), (2, 10), (89, 6), (52, 2), (45, 3)]     253      27
8     [(89, 6), (47, 6), (25, 7), (65, 6), (25, 6)]     251      31
2     [(72, 3), (81, 9), (52, 2), (25, 6), (18, 5)]     248      25
7    [(77, 10), (12, 7), (25, 6), (47, 6), (71, 5)]     232      34
10   [(71, 5), (65, 6), (36, 10), (25, 6), (32, 5)]     229      32
13  [(36, 10), (18, 5), (12, 7), (90, 10), (72, 3)]     228      35
14   [(77, 10), (22, 9), (90, 5), (25, 6), (11, 6)]     225      36
12   [(12, 7), (36, 10), (71, 5), (22, 9), (81, 9)]     222      40
9    [(52, 2), (25, 6), (18, 5), (36, 10), (89, 6)]     220      29
11    [(2, 10), (32, 5), (25, 6), (81, 9), (71, 5)]     211      35
19   [(36, 10), (11, 6), (2, 10), (89, 6), (65, 6)]     203      38
17    [(47, 6), (2, 10), (18, 5), (25, 6), (89, 6)]     181      33
6    [(18, 5), (2, 10), (71, 5), (11, 6), (36, 10)]     138      36
0    [(25, 7), (11, 6), (2, 10), (18, 5), (77, 10)]     133      38

ただし、populationはdict in listで扱いたいので、 DataFrameから再びdict in listに変換している。

# input
fitness = df.to_dict('records')
# output
{'score0': 342, 'score1': 37, 'param': [(36, 10), (45, 3), (81, 9), (90, 5), (90, 10)]}
{'score0': 334, 'score1': 40, 'param': [(22, 9), (65, 6), (77, 10), (89, 6), (81, 9)]}
{'score0': 319, 'score1': 29, 'param': [(22, 9), (89, 6), (71, 5), (72, 3), (65, 6)]}
{'score0': 308, 'score1': 41, 'param': [(25, 6), (81, 9), (36, 10), (89, 6), (77, 10)]}
{'score0': 295, 'score1': 26, 'param': [(77, 10), (32, 5), (52, 2), (89, 6), (45, 3)]}
{'score0': 264, 'score1': 27, 'param': [(65, 6), (72, 3), (90, 5), (25, 6), (12, 7)]}
{'score0': 253, 'score1': 27, 'param': [(65, 6), (2, 10), (89, 6), (52, 2), (45, 3)]}
{'score0': 251, 'score1': 31, 'param': [(89, 6), (47, 6), (25, 7), (65, 6), (25, 6)]}
{'score0': 248, 'score1': 25, 'param': [(72, 3), (81, 9), (52, 2), (25, 6), (18, 5)]}
{'score0': 232, 'score1': 34, 'param': [(77, 10), (12, 7), (25, 6), (47, 6), (71, 5)]}
{'score0': 229, 'score1': 32, 'param': [(71, 5), (65, 6), (36, 10), (25, 6), (32, 5)]}
{'score0': 228, 'score1': 35, 'param': [(36, 10), (18, 5), (12, 7), (90, 10), (72, 3)]}
{'score0': 225, 'score1': 36, 'param': [(77, 10), (22, 9), (90, 5), (25, 6), (11, 6)]}
{'score0': 222, 'score1': 40, 'param': [(12, 7), (36, 10), (71, 5), (22, 9), (81, 9)]}
{'score0': 220, 'score1': 29, 'param': [(52, 2), (25, 6), (18, 5), (36, 10), (89, 6)]}
{'score0': 211, 'score1': 35, 'param': [(2, 10), (32, 5), (25, 6), (81, 9), (71, 5)]}
{'score0': 203, 'score1': 38, 'param': [(36, 10), (11, 6), (2, 10), (89, 6), (65, 6)]}
{'score0': 181, 'score1': 33, 'param': [(47, 6), (2, 10), (18, 5), (25, 6), (89, 6)]}
{'score0': 138, 'score1': 36, 'param': [(18, 5), (2, 10), (71, 5), (11, 6), (36, 10)]}
{'score0': 133, 'score1': 38, 'param': [(25, 7), (11, 6), (2, 10), (18, 5), (77, 10)]}

個体と評価を保存する

ナップサック問題の個体の評価は、各品物の価格と重さを各々足し合わせるだけなので高速だが、 遺伝的アルゴリズムが扱う問題によっては評価に時間がかかることがある。 世代を重ねるごとに各個体は評価が上がり類似した遺伝子になっていく。 そのため、交叉や突然変異により生成された個体と同じ遺伝子を持つ個体が過去に存在することも珍しくない。 今後の遺伝的アルゴリズムの拡張を考えて、過去の個体とその評価を保存し、各個体を評価する前に参照するようにしておく。

保存する形式もdict in listにするが、パラメーターがlistだと一致評価できないので、Stringに変換してから保存。

self.fitness_master = {}
for fit in fitness:
    param = fit['param']
    self.fitness_master[str(param)] = {k:v for k,v in fit.items() if k!='param'}

個体の評価では、次の3パターンに別れる。

  1. スコアはないけどパラメーターはある
  2. スコアもなくてパラメーターもない
  3. スコアもパラメーターもある

1がまさに今回のパターン。2は新たな個体が生まれたとき。 3は交叉または突然変異の前の評価のとき。 このアルゴリズム特有で無駄な処理だが、個体の評価を参照することで高速処理できる。

fitness = []
for p in pop:
    if not p.has_key('score0'):
        # The indivisual made by crossover or mutation existed before
        if self.fitness_master.has_key(str(p['param'])):
            p.update(self.fitness_master[str(p['param'])])
        # The indivisual is the first
        else:
            p.update(self.clac_score(p['param']))
        fitness.append(p)
    else:
        fitness.append(p)

まとめ

実行結果は、以下の通り。 世代数が少ない、突然変異率が低いなどが原因で、 必ずしも一番価値のある商品だけで構成するとはなっていない。

               V   W  P                                    
Generation  0: 323 25 [(25, 1), (82, 3), (89, 6), (45, 10), (82, 5)]
Generation  1: 323 25 [(25, 1), (82, 3), (89, 6), (45, 10), (82, 5)]
Generation  2: 323 25 [(25, 1), (82, 3), (89, 6), (45, 10), (82, 5)]
Generation  3: 365 22 [(89, 6), (65, 2), (65, 2), (82, 5), (64, 7)]
Generation  4: 365 22 [(89, 6), (65, 2), (65, 2), (82, 5), (64, 7)]
Generation  5: 365 22 [(89, 6), (65, 2), (65, 2), (82, 5), (64, 7)]
Generation  6: 365 22 [(89, 6), (65, 2), (65, 2), (82, 5), (64, 7)]
Generation  7: 389 26 [(89, 6), (65, 2), (89, 6), (82, 5), (64, 7)]
Generation  8: 389 26 [(89, 6), (65, 2), (89, 6), (82, 5), (64, 7)]
Generation  9: 389 26 [(89, 6), (65, 2), (89, 6), (82, 5), (64, 7)]
Generation 10: 396 27 [(89, 6), (89, 6), (89, 6), (65, 2), (64, 7)]
Generation 11: 396 27 [(89, 6), (89, 6), (89, 6), (65, 2), (64, 7)]
Generation 12: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 13: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 14: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 15: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 16: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 17: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 18: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 19: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 20: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 21: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 22: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 23: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]
Generation 24: 420 31 [(89, 6), (89, 6), (89, 6), (89, 6), (64, 7)]

全コードも載せておく。

# coding: utf-8

import random
import math
import copy
import operator
import pandas as pd

N_ITEMS = 20
N_POP = 20
N_GEN = 25
MUTATE_PROB = 0.1
ELITE_RATE = 0.5

class GA:
    def __init__(self):
        self.items = {}
        self.fitness_master = {}


    def main(self): 
        pop = [{'param': p} for p in self.get_population()]
        
        for g in range(N_GEN):
            print 'Generation%3s:' % str(g), 

            # Get elites
            fitness = self.evaluate(pop)
            elites = fitness[:int(len(pop)*ELITE_RATE)]

            # Cross and mutate
            pop = elites[:]
            while len(pop) < N_POP:
                if random.random() < MUTATE_PROB:
                    m = random.randint(0, len(elites)-1)
                    child = self.mutate(elites[m]['param'])
                else:
                    c1 = random.randint(0, len(elites)-1)
                    c2 = random.randint(0, len(elites)-1)
                    child = self.crossover(elites[c1]['param'], elites[c2]['param'])
                pop.append({'param': child})
            
            # Evaluate indivisual
            fitness = self.evaluate(pop)
            pop = fitness[:]

            print pop[0]['score0'], pop[0]['score1'], pop[0]['param']

            
    def get_population(self):
        # Make items
        for i in xrange(N_ITEMS):
            self.items[i] = (random.randint(0, 100), random.randint(1, 10))  # value, weight

        # Make population
        pop = []
        for i in range(N_POP):
            ind = [self.items[k] for k in random.sample(range(N_ITEMS), 5)]
            pop.append(ind)

        return pop


    def clac_score(self, indivisual):
        dic = {}
        dic['score0'] = 0  # value
        dic['score1'] = 0  # weight
        for ind in indivisual:
            dic['score0'] += ind[0]
            dic['score1'] += ind[1]
            
        return dic


    def evaluate(self, pop):
        fitness = []
        for p in pop:
            if not p.has_key('score0'):
                # The indivisual made by crossover or mutation existed before
                if self.fitness_master.has_key(str(p['param'])):
                    p.update(self.fitness_master[str(p['param'])])
                # The indivisual is the first
                else:
                    p.update(self.clac_score(p['param']))
                fitness.append(p)
            else:
                fitness.append(p)

        # Save fitness to all genaration dictinary
        for fit in fitness:
            param = fit['param']
            self.fitness_master[str(param)] = {k:v for k,v in fit.items() if k!='param'}

        # This generation fitness
        df = pd.DataFrame(fitness)
        df = df.sort(['score0', 'score1'], ascending=[False, True])

        fitness = df.to_dict('records')
        
        return fitness


    def mutate(self, parent):
        ind_idx = int(math.floor(random.random()*len(parent)))
        item_idx = random.choice(range(N_ITEMS))
        child = copy.deepcopy(parent)
        child[ind_idx] = self.items[item_idx]

        return child


    def crossover(self, parent1, parent2):
        length = len(parent1)
        r1 = int(math.floor(random.random()*length))
        r2 = r1 + int(math.floor(random.random()*(length-r1)))
        
        child = copy.deepcopy(parent1)
        child[r1:r2] = parent2[r1:r2]

        return child


if __name__ == "__main__":
    GA().main()

追記: 第一世代の扱いについて

上記だと、for文の中にevaluate()が無駄に2つ入ってるが、 1つ目は第一世代の評価にしか使わないので、for文の外に出す方が適切。 for文の中では、交叉と突然変異で生まれた世代の評価を行う。

    def main(self): 
        print('Generation  1:'), 
        pop = [{'param': p} for p in self.get_population()]
        fitness = self.evaluate(pop)
        print(fitness[0]['score0'], fitness[0]['score1'], fitness[0]['param'])

        for g in range(N_GEN-1):
            print('Generation %2s:' % str(g+2)),
            
            # Get elites
            elites = fitness[:int(len(pop)*ELITE_RATE)]

            # Cross and mutate
            pop = elites[:]
            while len(pop) < N_POP:
                if random.random() < MUTATE_PROB:
                    m = random.randint(0, len(elites)-1)
                    child = self.mutate(elites[m]['param'])
                else:
                    c1 = random.randint(0, len(elites)-1)
                    c2 = random.randint(0, len(elites)-1)
                    child = self.crossover(elites[c1]['param'], elites[c2]['param'])
                pop.append({'param': child})
            
            # Evaluate indivisual
            fitness = self.evaluate(pop)
            print(fitness[0]['score0'], fitness[0]['score1'], fitness[0]['param'])

Pandas.DataFrameをdicts in listに変換する

dicts in listからDataFrameを作成することはよくあるが、 DataFrameをdicts in listにしたい時がたまにある。 df.to_dict('records')を実行するだけで良いのだが、 このシンプルなコードを良く忘れるのに記事にした。

入出力がまったく同じdicts in listで実施。

dicts_in_list = [{'x': random.randint(0, 10), 'y': random.randint(1, 5)} for i in range(10)]
for dil in dicts_in_list:
    print dil

df = pd.DataFrame(dicts_in_list)
print df

dicts_in_list = df.to_dict('records')
for dil in dicts_in_list:
    print dil

結果は以下の通り。

{'y': 1, 'x': 9}
{'y': 2, 'x': 7}
{'y': 3, 'x': 7}
{'y': 1, 'x': 10}
{'y': 5, 'x': 8}
{'y': 2, 'x': 1}
{'y': 5, 'x': 0}
{'y': 5, 'x': 6}
{'y': 5, 'x': 3}
{'y': 5, 'x': 7}

    x  y
0   9  1
1   7  2
2   7  3
3  10  1
4   8  5
5   1  2
6   0  5
7   6  5
8   3  5
9   7  5

{'y': 1, 'x': 9}
{'y': 2, 'x': 7}
{'y': 3, 'x': 7}
{'y': 1, 'x': 10}
{'y': 5, 'x': 8}
{'y': 2, 'x': 1}
{'y': 5, 'x': 0}
{'y': 5, 'x': 6}
{'y': 5, 'x': 3}
{'y': 5, 'x': 7}

参考文献

Pandas.DataFrameの複数カラムの同時ソート

多分100回ぐらい調べてるので、いい加減覚えるために、記事を書いてみた。 ランダム生成したxとyについて、各々降順、昇順で同時にソートする。

import pandas as pd

data = [{'x': random.randint(0, 10), 'y': random.randint(1, 5)} for i in range(10)]
df = pd.DataFrame(data)
df = df.sort(['x', 'y'], ascending=[False, True])
df.reset_index(drop=True, inplace=True)

print df

xが8,4,1の場合にソートが有効になっていることが確認できる。

    x  y
0  10  1
1   8  3
2   8  5
3   6  5
4   4  1
5   4  2
6   4  5
7   1  1
8   1  5
9   0  4

参考文献

ラベルをOneHotでエンコードする

Deep Learning、と言ってもTensorFlowしか使ったことないけど、 ラベルがnx1(nはレコード数)の行列の場合は、OneHotにエンコードする必要がある(下図)。 scikit-learnのOneHotEncoderで、一発でエンコードできるのだが、 nx1の行列をnxm(mはラベルの値の種類)に変換する処理をnumpyで行う必要があるので実装した。

# nx1 matrix
label = [4 3 0 0 1 1 1 4 3 0]

# nxm matrix
onehot_label = 
[[ 0.  0.  0.  0.  1.]
 [ 0.  0.  0.  1.  0.]
 [ 1.  0.  0.  0.  0.]
 [ 1.  0.  0.  0.  0.]
 [ 0.  1.  0.  0.  0.]
 [ 0.  1.  0.  0.  0.]
 [ 0.  1.  0.  0.  0.]
 [ 0.  0.  0.  0.  1.]
 [ 0.  0.  0.  1.  0.]
 [ 1.  0.  0.  0.  0.]]

実装

numpyでreshapeとtransposeを行い、 scikit-learnでonehotにencodeする。

import numpy as np
from sklearn.preprocessing import OneHotEncoder

# generate label.
X = np.array([random.choice(range(5)) for i in range(10)])
print(X)

# reshape
X = np.array(X).reshape(1, -1)
print(X)

# transpose
X = X.transpose()
print(X)

# encode label
encoder = OneHotEncoder(n_values=max(X)+1)
X = encoder.fit_transform(X).toarray()
print(X)

各々の出力結果は以下の通り。

[2 0 0 4 3 3 4 4 2 3]

[[2 0 0 4 3 3 4 4 2 3]]

[[2]
 [0]
 [0]
 [4]
 [3]
 [3]
 [4]
 [4]
 [2]
 [3]]

[[ 0.  0.  1.  0.  0.]
 [ 1.  0.  0.  0.  0.]
 [ 1.  0.  0.  0.  0.]
 [ 0.  0.  0.  0.  1.]
 [ 0.  0.  0.  1.  0.]
 [ 0.  0.  0.  1.  0.]
 [ 0.  0.  0.  0.  1.]
 [ 0.  0.  0.  0.  1.]
 [ 0.  0.  1.  0.  0.]
 [ 0.  0.  0.  1.  0.]]

参考文献

遺伝的アルゴリズムによるOneMax問題の最適化

OneMax問題とは、[1,0,0,1,0,1,1,1,1,0]のように0/1からなる数列の和を最大化する問題のこと。 答えはすべて1の数列と自明だが、これを最適化アルゴリズムの1つである遺伝的アルゴリズムで解く。 遺伝的アルゴリズムとは、このような数列を遺伝子と見立て複数生成し、 数世代に渡り交叉と突然変異を繰り返すことにより、優秀な遺伝子を生み出すアルゴリズムのこと。 ここでの優秀の定義は「数列の和が大きい」となる。

PythonではDEAPという 遺伝的アルゴリズムのライブラリーがあるが、個人的に拡張性が低くて使いにくかったのと、 実装がそれほど難しくはないので、Pythonでスクラッチから書くことにした。

アルゴリズムの流れ

  1. 第1世代の個体群の生成
  2. エリートの選択
  3. エリートの交叉/突然変異
  4. 次世代の個体群の生成
  5. 2〜4の操作を繰り返し、適応度の最も高い個体を解として出力

簡単な流れは上記の通り。 各世代の個体群の数は固定で、現世代のエリートと、彼らの子供の和となる。

1. 第1世代の個体群の生成

第2世代以降の個体群の生成は、基本的に交叉か突然変異になる。 そのため、この操作は第1世代だけ必要になる。 ランダム選択した0/1がN_PARAM個並ぶ数列を、 N_POP個生成する。

        
    def get_population(self):
        population = []
        for i in range(N_POP):
            arr = [random.randint(0,1) for j in range(N_PARAM)]
            population.append(arr)
        
        return population

各個体は後ほど適応度を計算するが、まず-1で初期化しておく。

>>> pop = [(-1, p) for p in self.get_population()]
>>> pop
[
    (-1, [0, 0, 0, 0, 1, 1, 1, 1, 0, 0]), 
    (-1, [1, 0, 1, 0, 1, 0, 0, 1, 0, 0]),
    (-1, [1, 0, 0, 0, 1, 0, 0, 0, 0, 1]),
    ...
]

2. エリートの選択

evaluateメソッドで、個体の適応度(fitness)を計算している。 OneMaxにおける適応度はリストの和とシンプルだが、今後の拡張を考えてメソッド化(clac_score)しておく。

    def clac_score(self, x):
        return sum(x)


    def evaluate(self, pop):
        fitness = []
        for p in pop:
            if p[0] == -1:
                fitness.append((self.clac_score(p[1]), p[1]))
            else:
                fitness.append(p)
        fitness.sort()
        fitness.reverse()

        return fitness

全個体の適応度を計算してソートしたら、ELITE_RATEの割合でエリートとして選択する。

>>> fitness = self.evaluate(pop)
>>> elites = fitness[:int(len(pop)*ELITE_RATE)]

3. エリートの交叉/突然変異

突然変異mutate()では、個体の数列からパラメーターを1つだけ反転させる。 交叉crossover()では、片親の遺伝子の一部を切り取り、もう片親の同じ位置にペーストする。 これを2点交叉というが、場合によっては1点交叉にもなり得る。

    def mutate(self, parent):
        r = int(math.floor(random.random()*len(parent)))
        child = copy.deepcopy(parent)
        child[r] = (parent[r]+1) % 2

        return child


    def crossover(self, parent1, parent2):
        length = len(parent1)
        r1 = int(math.floor(random.random()*length))
        r2 = r1 + int(math.floor(random.random()*(length-r1)))
        
        child = copy.deepcopy(parent1)
        child[r1:r2] = parent2[r1:r2]

        return child

突然変異はMUTATE_BROPの確率で起こり、それ以外は交叉となる。 これを全ての個体に対して行う。個体の組合せはランダムである。

pop = elites[:]
while len(pop) < N_POP:
    if random.random() < MUTATE_PROB:
        m = random.randint(0, len(elites)-1)
        child = self.mutate(elites[m][1])
    else:
        c1 = random.randint(0, len(elites)-1)
        c2 = random.randint(0, len(elites)-1)
        child = self.crossover(elites[c1][1], elites[c2][1])
    pop.append((-1, child))

4. 次世代の個体群の生成

現世代のエリートを選択し、彼らを交叉/突然変異させた個体群を生成した。 これらの適応度を計算することで、次世代の個体群が生成される。

fitness = self.evaluate(pop)
pop = fitness[:]

結果

10個の0/1の数列のOneMax問題を解かせたところ、第4世代で最適解が得られた。

Generation: 0
(8, [1, 1, 1, 1, 1, 1, 1, 0, 0, 1])

Generation: 1
(9, [1, 1, 1, 1, 1, 1, 1, 1, 0, 1])

Generation: 2
(9, [1, 1, 1, 1, 1, 1, 1, 1, 0, 1])

Generation: 3
(10, [1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

...

Generation: 24
(10, [1, 1, 1, 1, 1, 1, 1, 1, 1, 1])

ソースコード

ソースコードを以下に載せておく。1ファイルにまとまっている。$ python ga.pyで実行可能。

# coding: utf-8

import random
import math
import copy

N_PARAM = 10
N_POP = 20
N_GEN = 25
MUTATE_PROB = 0.1
ELITE_RATE = 0.2

class GA:
    def __init__(self):
        pass


    def main(self): 
        pop = [(-1, p) for p in self.get_population()]

        for g in range(N_GEN):
            print 'Generation: ' + str(g)

            # Get elites
            fitness = self.evaluate(pop)
            elites = fitness[:int(len(pop)*ELITE_RATE)]
            
            # Cross and mutate
            pop = elites[:]
            while len(pop) < N_POP:
                if random.random() < MUTATE_PROB:
                    m = random.randint(0, len(elites)-1)
                    child = self.mutate(elites[m][1])
                else:
                    c1 = random.randint(0, len(elites)-1)
                    c2 = random.randint(0, len(elites)-1)
                    child = self.crossover(elites[c1][1], elites[c2][1])
                pop.append((-1, child))
            
            # Evaluate indivisual
            fitness = self.evaluate(pop)
            pop = fitness[:]

            print pop[0]
            print


    def get_population(self):
        population = []
        for i in range(N_POP):
            arr = [random.randint(0,1) for j in range(N_PARAM)]
            population.append(arr)
        
        return population


    def clac_score(self, x):
        return sum(x)


    def evaluate(self, pop):
        fitness = []
        for p in pop:
            if p[0] == -1:
                fitness.append((self.clac_score(p[1]), p[1]))
            else:
                fitness.append(p)
        fitness.sort()
        fitness.reverse()

        return fitness


    def mutate(self, parent):
        r = int(math.floor(random.random()*len(parent)))
        child = copy.deepcopy(parent)
        child[r] = (parent[r]+1) % 2

        return child


    def crossover(self, parent1, parent2):
        length = len(parent1)
        r1 = int(math.floor(random.random()*length))
        r2 = r1 + int(math.floor(random.random()*(length-r1)))
        
        child = copy.deepcopy(parent1)
        child[r1:r2] = parent2[r1:r2]

        return child


if __name__ == "__main__":
    GA().main()

参考文献

DeepLearningの学習曲線をmatplotlibで可視化

TensorBoardのEventsによる学習状況の可視化 を行ったが、今後いろいろとカスタマイズしたくなると思い、matplotlibで可視化することにした。 今回もFizzBuzz問題を例に扱う。詳しくは、Deep Learning入門としてのFizzBuzz問題を参照。

精度とコストの記録

学習過程(train_network())で、エポックごとにtrain_losstrain_accuracytest_accuracyを算出して記録する。 記録はCSV形式で保存する。# Logging# Save logsがそれらの操作にあたる。

    def train_network(self, data, network):
        # data
        train_data  = data[0]
        train_label = data[1]
        test_data  = data[2]
        test_label = data[3]

        # model
        X  = network['X']
        Y  = network['Y']
        Y_ = network['Y_']
        loss  = network['loss']
        step  = network['step']

        # Setting
        sess = tf.InteractiveSession()
        tf.initialize_all_variables().run()
        accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(Y, 1), tf.argmax(Y_, 1)), tf.float32))
        
        logs = []
        for epoch in range(N_ITER+1):
            # Randamize data
            p = np.random.permutation(range(len(train_data)))
            train_data, train_label = train_data[p], train_label[p]

            # Training
            for start in range(0, train_label.shape[0], BATCH_SIZE):
                end = start + BATCH_SIZE
                sess.run(step, feed_dict={X: train_data[start:end], Y_: train_label[start:end]})
            
            # Testing
            train_loss = sess.run(loss, feed_dict={X: train_data, Y_: train_label})
            train_accuracy = sess.run(accuracy, feed_dict={X: train_data, Y_: train_label})
            test_accuracy = sess.run(accuracy, feed_dict={X: test_data, Y_: test_label})

            # Logging
            log = {'epoch': epoch, 'train_loss': train_loss, 'train_accuracy': train_accuracy, 'test_accuracy': test_accuracy}
            logs.append(log)

            if epoch % 100 == 0:
                std_output = 'Epoch: %s, \t Train Loss: %s, \t Train Accuracy: %s, \t Test Accuracy: %s'
                print(std_output % (log['epoch'], log['train_loss'], log['train_accuracy'], log['test_accuracy']))

        # Save logs
        df = pd.DataFrame(logs)
        df.to_csv("./log/acculoss_train_%s_batch_%s_iter_%s.csv" % (TRAIN_RATE, BATCH_SIZE, N_ITER), index=False)

CSVファイルは以下のようになる。

epoch,test_accuracy,train_accuracy,train_loss
0,0.5247524976730347,0.5341278314590454,1.3275593519210815
1,0.5247524976730347,0.5341278314590454,1.284529209136963
2,0.5247524976730347,0.5341278314590454,1.2487729787826538
...
9998,0.9801980257034302,1.0,0.008503337390720844
9999,0.9801980257034302,1.0,0.008444004692137241
10000,0.9801980257034302,1.0,0.008517774753272533

可視化

保存したCSVファイルを読み込んで、精度とコストの値をセットして、 背景色がグレーなのが気に入らないのでホワイトにして、各曲線にラベルを付けて、可視化を行う。

    def main(self):
        # Read file
        df = pd.read_csv('./log/acculoss_train_0.05_batch_100_iter_10000.csv')
        
        # Set values
        x = df['epoch'].values
        y0 = df['train_loss'].values
        y1 = df['train_accuracy'].values
        y2 = df['test_accuracy'].values

        # Set background color to white
        fig = plt.figure()
        fig.patch.set_facecolor('white')

        # Plot lines
        plt.xlabel('epoch')
        plt.plot(x, y0, label='train_loss')
        plt.plot(x, y1, label='train_accuracy')
        plt.plot(x, y2, label='test_accuracy')
        plt.legend()

        # Visualize
        plt.show()

可視化すると以下のようになる。 train_accuracytrain_lossだけを見ると上手くいってるように見えるが、 test_accuracyを見ると過学習しているのが分かる。

f:id:Shoto:20170107164129p:plain

ちなみに最終結果は、以下の通り、まあまあの精度。

Epoch Train Loss Train Accuracy Test Accuracy
10000 0.00851777 1.0 0.980198

TensorBoardのEventsによる学習状況の可視化

TensorFlowに学習状況を可視化するTensorBoardというツールがあるのだが、 コスト関数と精度を可視化してみたかったので使ってみた。

実装

Deep Learning入門としてのFizzBuzz問題 のモデル学習のメソッドをベースにしている。 コメント# Logging data for TensorBoard# Write log to TensorBoardの直下のコードがTensorBoardを利用する部分のコード。

def train_model(self, data, model):
    # dataのセット
    train_X = data[0]
    train_Y = data[1]
    test_X = data[2]
    test_Y = data[3]

    # modelのセット
    X = model['X']
    Y = model['Y']
    Y_ = model['Y_']
    loss = model['loss']
    train_step = model['train_step']

    # 定義
    accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(Y, 1), tf.argmax(Y_, 1)), tf.float32))

    # 初期化
    sess = tf.InteractiveSession()
    tf.initialize_all_variables().run()

    # Logging data for TensorBoard
    _ = tf.scalar_summary('loss', loss)
    _ = tf.scalar_summary('accuracy', accuracy)
    writer = tf.train.SummaryWriter('./log/', graph_def=sess.graph_def)

    for epoch in range(10000+1):
        # データのランダマイズ
        p = np.random.permutation(range(len(train_X)))
        train_X, train_Y = train_X[p], train_Y[p]

        # 学習
        for start in range(0, train_X.shape[0], 100):
            end = start + 100
            sess.run(train_step, feed_dict={X: train_X[start:end], Y_: train_Y[start:end]})

        # テスト
        if epoch % 100 == 0:
            # 教師データのコスト関数
            loss_train = sess.run(loss, feed_dict={X: train_X, Y_: train_Y})
            # 教師データの精度
            accu_train = sess.run(accuracy, feed_dict={X: train_X, Y_: train_Y})
            # テストデータのコスト関数
            loss_test = sess.run(loss, feed_dict={X: test_X, Y_: test_Y})
            # テストデータの精度
            accu_test = sess.run(accuracy, feed_dict={X: test_X, Y_: test_Y})
            # 標準出力
            std_output = 'Epoch: %s, \t Train Loss: %s, \t Train Accracy: %s, \t Test Loss: %s, \t Test Accracy: %s'
            print std_output % (epoch, round(loss_train, 6), round(accu_train, 6), round(loss_test, 6), round(accu_test, 6))

        # Write log to TensorBoard
        summary_str = sess.run(tf.merge_all_summaries(), feed_dict={X: test_X, Y_: test_Y})
        writer.add_summary(summary_str, epoch)

 

解説

コスト関数は引数として渡される。 実装はコメントの通り。

# loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(Y, Y_))
loss = model['loss']

 

精度は以下の通り。 詳しい実装内容はこちらを参照。

accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.argmax(Y, 1), tf.argmax(Y_, 1)), tf.float32))

 

ここからが、TensorBoardの実装部分。 1、2行目は可視化したいコスト関数と精度の変数を出力名と共に、tf.scalar_summary()に渡している。 3行目は出力先を指定している。

# Logging data for TensorBoard
_ = tf.scalar_summary('loss', loss)
_ = tf.scalar_summary('accuracy', accuracy)
writer = tf.train.SummaryWriter('./log/', graph_def=sess.graph_def)

 

tf.merge_all_summaries()は上記で設定した_(コスト関数と精度)を指す。 これらにfeed_dictに指定したテストデータとテストラベルを渡して、結果をTensorBoardに出力する。 出力間隔はepochとしたので、for文の中で定義している。

for epoch in range(10000+1):

    ...

    # Write log to TensorBoard
    summary_str = sess.run(tf.merge_all_summaries(), feed_dict={X: test_X, Y_: test_Y})
    writer.add_summary(summary_str, epoch)

 

学習終了したら、別ターミナルで以下のコードを実行すると、 http://0.0.0.0:6006からTensorBoardを見ることができる。 ただし、1行目はvirtualenvの場合のみ実行。

$ source ~/tensorflow/bin/activate
(tensorflow) $ python ~/tensorflow/lib/python2.7/site-packages/tensorflow/tensorboard/tensorboard.py --logdir=./log/
Starting TensorBoard 16 on port 6006
(You can navigate to http://0.0.0.0:6006)

 

結果

次のように可視化される。 accuracyがサッチってないので、バッチ数などのパラメーターを変えるか、epoch数を増やす必要があることが分かる。

f:id:Shoto:20161218214154p:plainf:id:Shoto:20161218214200p:plain

参考文献