読者です 読者をやめる 読者になる 読者になる

Pythonの並列処理

最近Pythonの並列処理をよく使うのでまとめておく。

基本形

並列処理したいメソッドを別に書いてPoolから呼び出す。 multiprocessing.cpu_count()はシステムのCPU数を返す。 僕の環境では4。デュアルコアなのでスレッド数だと思う。

import multiprocessing

def f(x):
    return x*x

n = multiprocessing.cpu_count()
p = multiprocessing.Pool(n)
params = range(1,4)
result = p.map(f, params)

1から3までを二乗しているので、次のような結果になる。

>>> print result
[1, 4, 9]

複数の引数を渡す

Pool().map()には引数は1つしか渡せない。 しかし、これは複数の引数を1つの引数にまとめることで解決する。 下記ではdictを利用しているが、listでもtupleでもいい。

import multiprocessing

def f(param):
    return param['x']*param['y']

n = multiprocessing.cpu_count()
p = multiprocessing.Pool(n)
# Make one param including multi params.
params = [{'x': i, 'y': i+5} for i in range(1,4)]
result = p.map(f, params)

この結果、2つのパラメーターが渡され、 [1*(1+5), 2*(2+5), 3*(3+5)]を計算した結果が出力される。

>>> print result
[6, 14, 24]

プロセスのメモリ解放

並列処理するメソッド内で、DBを読み込む処理を行い、 更にこの並列処理の何度も繰り返し呼び出していたら、 OSError: [Errno 24] Too many open filesというエラーが出た。 プロセスを終了して、メモリを開放する必要があるが、 Pool().close()とPool().terminate()を呼び出すことでエラーが回避された。

import multiprocessing

def f(param):
    return param['x']*param['y']

n = multiprocessing.cpu_count()
p = multiprocessing.Pool(n)
params = [{'x': i, 'y': i+5} for i in range(1,4)]
result = p.map(f, params)
p.close()  # add this.
p.terminate()  # add this.

プロセス終了後もresultに結果が残っている。

>>> print result
[6, 14, 24]

ちなみに今回追加した処理の内容は次の通り。

  • close()
    これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。

  • terminate()
    実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに terminate() が呼び出されます。

参考文献