Pythonでマルチプロセスを実装するためのmultiprocessingモジュールの使い方について解説します。
Contents
Pythonによるマルチプロセス
マルチプロセスとは、プログラムの実行方法の一つで複数のプロセスを同時に実行することを言います。Pythonでは、マルチプロセスの実装のためのmultiprocessingモジュールが提供されています。
本記事では、multiprocessingモジュールを用いたマルチプロセスの実装の基本について紹介します。
具体的なモジュールの使い方の説明の前に、まずはPythonにおけるマルチプロセスとマルチスレッドの違いについて簡単に触れておこうと思います。
Pythonにおけるマルチプロセスとマルチスレッド
Pythonには、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているのはC言語をベースにしたCPythonです。
CPythonの実装には、マルチスレッドを効果的に使用できないという大きな制限が存在します。これは、グローバルインタープリタロック(GIL)として知られています。GILは全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。
GILの存在により、Pythonオブジェクトへのアクセスは一度に一つのスレッドだけに限られます。そのため、PythonでのCPUバウンドな処理では、マルチスレッドを利用しても真の並列実行が難しく、スレッドがシリアル化されて順次実行されるため、高速化されることはほとんどありません。
マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。
また、マルチプロセスでは、各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作することが特徴です。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有するため、注意が必要です。
しかし、マルチプロセスを利用する際の課題として、プロセス間のデータの交換が必要になることが挙げられます。Pythonにはこのようなプロセス間の通信を助ける強力なツールが提供されており、以降でも紹介していきます。
Pythonでマルチスレッドを実現するthreadingモジュールの使い方については「threadingによるマルチスレッド処理の基本」にまとめているので興味があれば参考にしてください。
multiprocessingの使い方
基本的な使い方 Process
mutiprocessingモジュールを使って、複数のプロセスを生成し、処理を実行する基本的な使い方を以下の例で見ていきましょう。threadingモジュールを使ったことがある人はほとんど同じであることが分かるかと思います。
import logging import os import time from multiprocessing import Process logging.basicConfig( level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s" ) def myworker1(): logging.debug(f"start (pid:{os.getpid()})") time.sleep(5) logging.debug(f"end (pid:{os.getpid()})") def myworker2(): logging.debug(f"start (pid:{os.getpid()})") time.sleep(5) logging.debug(f"end (pid:{os.getpid()})") def main(): logging.debug(f"start (pid:{os.getpid()})") # プロセスの生成 process1 = Process(target=myworker1) process2 = Process(target=myworker2) # プロセスの開始 process1.start() process2.start() # プロセスの終了を待機 process1.join() process2.join() logging.debug(f"end (pid:{os.getpid()})") if __name__ == "__main__": main()
【実行結果】 MainProcess_MainThread: start (pid:27024) Process-1_MainThread: start (pid:28944) Process-2_MainThread: start (pid:18884) Process-2_MainThread: end (pid:18884) Process-1_MainThread: end (pid:28944) MainProcess_MainThread: end (pid:27024)
上記例は、myworker1とmyworker2という関数を別のプロセスで並列処理するプログラムとなっています。実行してみていただくとProcess-1とProcess-2というプロセスが動作し、それぞれのメインスレッド(MainThread)で処理が動作していることが分かります。以下のイメージ図のような形です。

os.getpidでProcess ID(PID)を取得し表示していますが、それぞれ異なっていることからも別プロセスであることが把握できます。
プロセスの生成は、multiprocessingモジュールのProcessを使います。以下のようにtarget引数に各プロセスで実行する関数を渡します。
# プロセスの生成 process1 = Process(target=myworker1) process2 = Process(target=myworker2)
各プロセスの終了を待機するのがjoinメソッドです。プロセスが終了したら、joinより下のコードが実行されます。
# プロセスの終了を待機 process1.join() process2.join()
プロセスの出力では、以下の部分で定義しているようにloggingを使用しています。loggingのformatでprocessNameやthreadNameを指定すると分かりやすく表示できます。
logging.basicConfig( level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s" )
今回は、それぞれでプロセスでMainThreadが動いていることを示すためにthreadNameを表示しましたが、以降の例では煩雑になるのでprocessNameのみ表示にします。
なお、loggingについては「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。
上記のようにProcessを使うことでマルチプロセスでの実装をすることができます。
プロセス名称をつけたい場合
上記例では、プロセス名称(processName)が「Process-1」「Process-2」というようになっていました。プロセスに個別に名称をつけたい場合には、Process生成時に以下のようにname引数で指定します。(変更のない部分は省略します。)
...(省略)... # プロセスの生成 process1 = Process(target=myworker1, name="myworker1_process") process2 = Process(target=myworker2, name="myworker2_process") ...(省略)...
【実行結果】 MainProcess: start myworker1_process: start myworker2_process: start myworker1_process: end myworker2_process: end MainProcess: end
name引数で指定することで、loggingで出力しているprocessNameの部分は指定した名称に変わります。
プロセスで動作する関数への引数の渡し方
プロセスで動作する関数へ引数を渡したい場合には、Processの生成時に以下のようにargs引数やkwargs引数を使用します。(変更のない部分は省略します)
...(省略)... def myworker1(x: int, y: int): logging.debug("start") logging.debug(f"x: {x}, y: {y}") time.sleep(5) logging.debug("end") ...(省略)... # プロセスの生成 process1 = Process(target=myworker1, args=(10,), kwargs={"y": 20}) ...(省略)...
【実行結果】 MainProcess: start Process-1: start Process-1: x: 10, y: 20 Process-2: start Process-1: end Process-2: end MainProcess: end
上記例では、myworker1の方の関数に引数としてintのx, yがあります。この関数に引数を渡す場合には、argsにタプルで位置引数で指定するか、kwargsに辞書でキーワード引数として指定することができます。argsはタプルなので、1つ引数を渡す場合は上記の(10,)のようにカンマ(,)がいるので注意しましょう。
実行結果を見てもわかるようにProcess-1の方では、メインプロセスから引数として渡した情報を表示できていることが分かります。
マルチスレッドとマルチプロセスのメモリの扱いの違い
冒頭でマルチプロセスは各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作するということを説明しました。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有します。
このことを簡単なプログラムで確認してみたいと思います。
マルチプロセスの場合
import logging from multiprocessing import Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(data: dict): logging.debug("start") data["x"] += 1 logging.debug(f"end: {data}, {id(data)}") def main(): logging.debug("start") # データを用意 data = {"x": 0} # プロセスを生成 process1 = Process(target=myworker, args=(data,)) process2 = Process(target=myworker, args=(data,)) # プロセスの開始 process1.start() process2.start() # プロセスの終了を待機 process1.join() process2.join() logging.debug(f"end: {data}, {id(data)}") if __name__ == "__main__": main()
【実行結果】 MainProcess: start Process-1: start Process-2: start Process-1: end: {'x': 1}, 2324205073536 Process-2: end: {'x': 1}, 1441800731136 MainProcess: end: {'x': 0}, 3244999688960
上記例では、dataという辞書を用意して、Process-1、Process-2に渡しています。この時、対象の辞書のidを確認していますが、マルチプロセスでは、MainProcess、Process-1、Process-2それぞれでidが異なっていることが分かります。
Process-1やProcess-2は、開始時点でMainProcessから子プロセスとして生成されます。このような処理をフォーク(fork)と言います。この時点でdataの辞書は{‘x’: 0}で各プロセスごとでコピーされるような形となるため、各プロセスでは1が加算されて{‘x’: 1}となっていますが、MainProcessでは{‘x’:0}のままになります。
このように、マルチプロセスでは各プロセスで独立したメモリ空間を扱います。
マルチスレッドの場合
以下は上記と全く同じコードを、マルチスレッドで実装した場合です。
import logging from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(data: dict): logging.debug("start") data["x"] += 1 logging.debug(f"end: {data}, {id(data)}") def main(): logging.debug("start") # データを用意 data = {"x": 0} # スレッドを生成 thread1 = Thread(target=myworker, args=(data,)) thread2 = Thread(target=myworker, args=(data,)) # スレッドの開始 thread1.start() thread2.start() # スレッドの終了を待機 thread1.join() thread2.join() logging.debug(f"end: {data}, {id(data)}") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (myworker): start Thread-1 (myworker): end: {'x': 1}, 3138390283776 Thread-2 (myworker): start Thread-2 (myworker): end: {'x': 2}, 3138390283776 MainThread: end: {'x': 2}, 3138390283776
マルチスレッドでも同じように実装していますが、MainThread、Thread-1、Thread-2で対象の辞書のidは全く同じになっていることが分かります。このようにマルチスレッドでは各スレッドが同じメモリ領域を参照します。
実際に上記のプログラムはスレッドセーフではありません。安全にするにはLock等を使用するのが適切です。
上記で見たようにマルチプロセスとマルチスレッドのメモリの扱いの違いはよく理解しておくようにしましょう。
プロセスプール Pool
マルチプロセスは、各プロセスが独立したリソースを使用するため子プロセスを大量に生成してしまうことは避けるべきです。マルチプロセスを用いたアプリケーションでは、プロセスプールを構築してリソースの利用を制限するのが良い方法です。
multiprocessingモジュールでは、Poolクラスというプロセスプールのための枠組みを提供してくれています。threadingにはないため、この点はmultiprocessingの方が優れています。Poolクラスが、複数のプロセスを管理する複雑な処理を肩代わりしてくれます。
Poolは、以下のようにwithブロックで使用します。ここでは構成を説明のみとし、具体的な処理は省略しています。
# プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(): ...(マルチプロセスで動作させる処理を記載)... def main(): # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: ...(処理を記載)... if __name__ == "__main__": main()
上記例では、プロセスプールのサイズを3にしているので、プロセスは同時に3つまで起動します。(処理の記載)の部分では、使用できる便利なメソッドがいくつもあります。以降ではそれらのメソッドを使用したプロセスプールの実装例を紹介していきます。
プールでのプロセス実行 apply_async
Poolを使ったプロセスプールでプロセスを実行する場合には、以下のようにapply_asyncメソッドが使用できます。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x def main(): logging.debug("start") # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: process1 = pool.apply_async(func=myworker, args=(10,)) process2 = pool.apply_async(func=myworker, args=(20,)) process3 = pool.apply_async(func=myworker, args=(30,)) process4 = pool.apply_async(func=myworker, args=(40,)) process5 = pool.apply_async(func=myworker, args=(50,)) logging.debug("execute async") # 実行結果を取得 logging.debug(process1.get()) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-3: start SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-3: end SpawnPoolWorker-2: end SpawnPoolWorker-1: start SpawnPoolWorker-3: start MainProcess: 10 MainProcess: 20 MainProcess: 30 SpawnPoolWorker-1: end SpawnPoolWorker-3: end MainProcess: 40 MainProcess: 50 MainProcess: end
apply_asyncの使い方は、Processの生成時とほとんど同じですが、Processでtargetに該当する引数はfuncとなっています。asyncは名前の通り非同期を意味するため、以降の”execute async”という出力が先にされていることが分かります。なお、関数の返却値は、getメソッドで取得することができます。
今回プロセスプールのサイズを3としているので、一度に実行されるプロセスは3つまでです。上記結果例では、プロセス1と3が終わり次第、残りの2つプロセスが起動し後続処理が動作していることが分かります。
timeoutの設定
apply_asyncの実行では、以下のようにtimeoutを指定することができます。変更のない部分は省略しています。timeoutは、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutErrorとなります。
...(省略)... # 実行結果を取得 logging.debug(process1.get(timeout=1)) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) ...(省略)...
【実行結果】 MainProcess: start MainProcess: execute async SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: start Traceback (most recent call last): ...(省略)... multiprocessing.context.TimeoutError
上記例では、timeout=1としているので1秒以内に処理が返ってくる必要がありますが、myworkerの処理でsleep(5)を入れているので処理は終わりません。この場合は、上記のようにTimeoutErrorとなります。
timeout引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。
プロセスのブロック apply
apply_asyncに似たメソッドとして、applyメソッドがあります。このメソッドは、終了するまで後続の処理をブロックします。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x def main(): logging.debug("start") # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # プロセスでブロック process = pool.apply(func=myworker, args=(0,)) logging.debug(process) # 上記を実行してから並列実行 process1 = pool.apply_async(func=myworker, args=(10,)) process2 = pool.apply_async(func=myworker, args=(20,)) process3 = pool.apply_async(func=myworker, args=(30,)) process4 = pool.apply_async(func=myworker, args=(40,)) process5 = pool.apply_async(func=myworker, args=(50,)) # 実行結果を取得 logging.debug(process1.get()) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start SpawnPoolWorker-1: start SpawnPoolWorker-1: end MainProcess: 0 SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: start SpawnPoolWorker-1: end SpawnPoolWorker-3: end SpawnPoolWorker-2: end SpawnPoolWorker-1: start SpawnPoolWorker-3: start MainProcess: 10 MainProcess: 20 MainProcess: 30 SpawnPoolWorker-1: end SpawnPoolWorker-3: end MainProcess: 40 MainProcess: 50 MainProcess: end
applyの使い方は、apply_asyncと同じです。上記実行結果を見てもわかるように、まずapplyで指定した1つのプロセスが実行されて後続の処理はブロックされます。applyのプロセスが完了したらapply_asyncにより並列処理が実行されています。
このことから、並列処理という観点ではapply_asyncの方が適しています。先にある処理を実行してから後続処理を並列で実行するような、処理の順序付けをしたい場合には、applyを使うことでうまく制御できるかなと思います。
イテラブルな引数の使用 map
apply_asyncやapplyは一つの引数の組を渡してプロセスを実行しました。例えばリストで保持している引数を順に渡して処理をさせたいような場合には、for文で順次渡すといったことが必要です。しかし、Poolクラスでは、mapメソッドが用意されているので、このようなケースでも以下のようにシンプルに記載できます。
このmapメソッドは、組み込み関数のmapの並列版ともいえるものです。ただし、組み込み関数のmapは複数のイテラブルを指定できるのに対し、Poolのmapメソッドはイテラブルな引数は一つのみサポートするという点で違いがあります。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # mapで各プロセスを実行 ↓でブロックされる result = pool.map(func=myworker, iterable=values) # 全てのプロセスが完了してから表示 logging.debug("executed") # 実行結果を表示 logging.debug(result) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] SpawnPoolWorker-1: start SpawnPoolWorker-3: start SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-1: start SpawnPoolWorker-2: end SpawnPoolWorker-3: end SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-2: end MainProcess: executed MainProcess: [2, 4, 6, 8, 10] MainProcess: end
上記の例では、myworkerは受け取った値を2倍するようなものになっています。map関数は、apply_asyncやapplyと使い方は同じですが、iterable引数でイテラブルなオブジェクトを渡します。mapメソッドは、iterableで指定したイテラブルの各要素を順次渡して処理をし、処理結果を返却します。
mapメソッドでは、mapを記載した行で処理がブロックされるため、後続の”executed”という文字は全てのプロセスが完了してから表示されます。
イテラブルな引数の使用(非同期版) map_async
上記例で見たようにmapは後続の処理をブロックするような動作をしました。mapの非同期版ともいえるのが、map_asyncメソッドです。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # map_asyncで非同期に実行 result = pool.map_async(func=myworker, iterable=values) # 非同期なのですぐ表示される logging.debug("execute async") # 実行結果を表示 logging.debug(result.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: end SpawnPoolWorker-2: end SpawnPoolWorker-3: end SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-2: end SpawnPoolWorker-1: end MainProcess: [2, 4, 6, 8, 10] MainProcess: end
map_asyncの使い方は、mapと同じですが、map_asyncは非同期であるため”execute async”という文字列ががすぐに表示されていることが分かります。
timeoutの設定
map_asyncの実行では、以下のようにtimeoutを指定することができます。変更のない部分は省略しています。timeoutは、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutErrorとなります。
...(省略)... # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # map_asyncで非同期に実行 result = pool.map_async(func=myworker, iterable=values) # 非同期なのですぐ表示される logging.debug("execute async") # 実行結果を表示 logging.debug(result.get(timeout=1)) ...(省略)...
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start Traceback (most recent call last): ...(省略)... multiprocessing.context.TimeoutError
上記例では、timeout=1としているので1秒以内に処理が返ってくる必要がありますが、処理でsleep(5)を入れているので処理は終わりません。この場合は、上記のようにTimeoutErrorとなります。
timeout引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。
イテラブルな引数の使用(遅延評価版) imap
map、map_asyncと紹介してきましたが、imapという遅延評価版のメソッドもあります。遅延評価とは、必要になるタイミングまで値の評価がされないことを意味します。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # imapでイテレータを生成 process_iter = pool.imap(func=myworker, iterable=values) # ここではイテレータが取得されるだけ logging.debug(process_iter) logging.debug("execute") # 使用するときにイテレータから読みだして実行 (遅延評価) for i in process_iter: logging.debug(i) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: <multiprocessing.pool.IMapIterator object at 0x000001D3AF5D4110> MainProcess: execute SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-2: end SpawnPoolWorker-1: end SpawnPoolWorker-1: start SpawnPoolWorker-2: start MainProcess: 2 MainProcess: 4 SpawnPoolWorker-3: end MainProcess: 6 SpawnPoolWorker-2: end SpawnPoolWorker-1: end MainProcess: 8 MainProcess: 10 MainProcess: end
imapは、mapやmap_asyncと使用方法は同じですが、返却値がイテレータとなっています。返却値を表示してみていますが「multiprocessing.pool.IMapIterator object」となっていることからもイテレータであることが分かります。
その後、”execute”という表示をしていますが、このタイミングではプロセスの実行はされていません。具体的にfor文でイテレータから取り出しているタイミングで各プロセスで処理が実行されます。必要なタイミングで処理が動いていることから遅延評価と言います。
プロセス間のデータ交換方法
マルチプロセスでは、複数のプロセスが独立して動作するため、プロセス間のデータ交換の仕組みが必要です。以降では、multiprocessingモジュールでのプロセス間のデータ交換で使用できるQueueとPipeについて紹介します。
Queue
キューを用いたプロセス間のデータ制御をするためにmultiprocessingモジュールでは、Queueクラスが用意されています。これはqueueモジュールのQueueと同じように使用できます。また、例外についてはqueueモジュールのqueue.Emptyやqueue.Fullといった例外が送出されます。
import logging import time from multiprocessing import Process, Queue from queue import Empty logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(work_queue): logging.debug("start") # キューが空でない限り繰り返す while not work_queue.empty(): try: item = work_queue.get() except Empty: break else: time.sleep(1) logging.debug(item) logging.debug("end") def main(): logging.debug("start") # プロセスで共有するキューを用意する work_queue = Queue() # 表示したい値のリスト vals = [i for i in range(10)] # キューに投入する for val in vals: work_queue.put(val) # プロセスプール数のプロセスを用意 processes = [ Process(target=myworker, args=(work_queue,)) for _ in range(PROCESS_POOL_SIZE) ] # プロセスの開始 for process in processes: process.start() # 終了を待機 for process in processes: process.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start Process-1: start Process-3: start Process-2: start Process-2: 2 Process-3: 1 Process-1: 0 Process-2: 3 Process-3: 4 Process-1: 5 Process-1: 8 Process-3: 7 Process-2: 6 Process-3: end Process-2: end Process-1: 9 Process-1: end MainProcess: end
上記例は、Poolクラスでのプロセスプールの例をQueueを使って実装したようなものになっています。処理の構成については、threadingでキューについて説明している「キューを用いたスレッド制御」と同じのため、詳細はそちらを参照してください。
ただし、参照先のqueue.Queueを使った例とは違い、multiprocessingのQueueには、task_doneやjoinがないので注意してください。
Pipe
各プロセス間でソケットのような双方向の通信チャンネルを作成したい場合には、Pipeを使用します。Pipeは以下のように使用します。
import logging from multiprocessing import Pipe, Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") class CustomClass: pass def myworker(conn): logging.debug("child: start") # 送信データの用意 data = [1, "string", {"dict": 0}, CustomClass()] logging.debug(f"child: {data}") # 親プロセスへデータを送信 conn.send(data) # コネクションをクローズ conn.close() logging.debug("child: end") def main(): logging.debug("parent: start") # 親コネクションと子コネクションを生成 # 引数指定しない場合は、デフォルトはPipe(duplex=True)で双方向になっている # Pipe(duplex=False)にするとparrent_conn:受信専用、child_conn:送信専用になる parent_conn, child_conn = Pipe() child_process = Process(target=myworker, args=(child_conn,)) # 子プロセスの開始 child_process.start() # 子プロセスからのデータを受信 data = parent_conn.recv() logging.debug(f"parent: {data}") # プロセスの終了を待機 child_process.join() logging.debug("parent: end") if __name__ == "__main__": main()
【実行結果】 MainProcess: parent: start Process-1: child: start Process-1: child: [1, 'string', {'dict': 0}, <__mp_main__.CustomClass object at 0x000001A0762E9750>] Process-1: child: end MainProcess: parent: [1, 'string', {'dict': 0}, <__main__.CustomClass object at 0x00000225FF32EE90>] MainProcess: parent: end
Pipeを使用する際には「parent_conn, child_conn = Pipe()」といったように生成します。parent_connは親コネクションで、child_connは子コネクションを表しています。
子プロセスを生成する際に、child_connを引数として渡すことで親プロセスとの通信ができるようになります。
def myworker(conn): logging.debug("child: start") # 送信データの用意 data = [1, "string", {"dict": 0}, CustomClass()] logging.debug(f"child: {data}") # 親プロセスへデータを送信 conn.send(data) # コネクションをクローズ conn.close() logging.debug("child: end")
子プロセスでは上記のように受け取ったコネクションを使用して、sendメソッドで親プロセスにデータを送信できます。コネクションは不要になったらcloseするようにしましょう。
# 子プロセスからのデータを受信 data = parent_conn.recv() logging.debug(f"parent: {data}")
親プロセス側では、上記の部分で子プロセスからのデータを受信します。データの受信では、親コネクションのrecvメソッドを使用します。これにより子プロセスからのデータを受信できるわけです。
このようにPipeを使うことで親プロセスと子プロセス間でデータのやり取りができます。なお、Pipe生成時に引数を指定しないデフォルトの場合、デフォルトはPipe(duplex=True)で双方向通信できるようになっています。そのため、parent_connの方からsendメソッドを使うこともできます。
しかし、Pipe(duplex=False)にするとparrent_connは受信専用、child_connは送信専用となります。通信方向を制限したい場合には使用を検討するとよいでしょう。もし、duplex=Falseにした状態で、parent_conn.sendを実行しようとすると以下のように例外となります。
Traceback (most recent call last): ...(省略)... OSError: connection is read-only
プロセス間におけるデータの共有
マルチプロセス間でデータをやり取りするQueueやPipeについて見てきました。プロセス間でデータをやり取りする方法としてもう一つプロセス間で共有される専用メモリを使う方法があります。以降では共有メモリを使うための仕組みをいくつか紹介します。
マルチプロセスでは、プロセス生成時のオーバーヘッドがかかったり、データを交換する手間が発生してしまいますが、メモリ空間を共有しないことは誤ってメモリのデータを破壊してしまう危険性が減るため利点と言えます。
そのため、基本的にはマルチプロセスでは、以降で紹介するデータ共有の仕組みは使わない方がいいと思います。どうしてもデータを共有しなければならないケースなどでは十分注意して使うようにしましょう。
Value, Array
プロセス間でデータを共有する簡単な方法としては、Value、Arrayを使う方法があります。これらのオブジェクトは共有ctypesオブジェクト(shared ctypes)と言い、プロセス間で共有されるオブジェクトになります。
import logging from multiprocessing import Array, Process, Value logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(num, arr): logging.debug("start") logging.debug(num) logging.debug(arr) # 値を変更 num.value += 1.0 for i in range(len(arr)): arr[i] *= 2 # 値の表示 logging.debug(num.value) logging.debug(arr[:]) logging.debug("end") def main(): logging.debug("start") # "d":double(倍精度浮動小数) "i":sined int(符号付き整数) 他は以下を参照 # https://docs.python.org/ja/3/library/array.html#module-array num = Value("d", 0.0) arr = Array("i", range(5)) # 値の表示 logging.debug(num.value) logging.debug(arr[:]) # プロセスを生成 p = Process(target=myworker, args=(num, arr)) # プロセスの開始 p.start() # プロセスの終了を待機 p.join() # 値の表示 logging.debug(num.value) logging.debug(arr[:]) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: 0.0 MainProcess: [0, 1, 2, 3, 4] Process-1: start Process-1: <Synchronized wrapper for c_double(0.0)> Process-1: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x0000017E77500DD0>> Process-1: 1.0 Process-1: [0, 2, 4, 6, 8] Process-1: end MainProcess: 1.0 MainProcess: [0, 2, 4, 6, 8] MainProcess: end
ValueやArrayの使い方は簡単で、以下のように生成して使います。
num = Value("d", 0.0) arr = Array("i", range(5)) # 値の表示 logging.debug(num.value) logging.debug(arr[:])
最初の引数に指定している”d”や”i”は、”d”がdouble(倍精度浮動小数)、”i”がsined int(符号付き整数)といった型コードを表しています。型コードの指定はこちらのページを参考にしてください。
子プロセス側で変更を加えているのは以下の部分です。
# 値を変更 num.value += 1.0 for i in range(len(arr)): arr[i] *= 2
Valueの値の変更ではnum.valueに対して操作します。また、Arrayの方はfor文で上記のように各要素にアクセスする必要があります。
上記結果を見てもわかるようにValueやArrayは共有されているため、子プロセスで変更を加えた内容を親プロセス側でも直接確認できていることが分かります。
Manager
Value、Arrayといったデータ共有の仕組みを紹介しましたが、少しPythonっぽくなく使いにくい印象を受けたかと思います。
multiprocessingモジュールには、共有データを扱いやすくするためのManagerというものがあります。Managerはサーバープロセスを管理するためのものとなっています。Managerの使い方を以下の例で見てみましょう。
import logging from multiprocessing import Manager, Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(tmp_l, tmp_d, tmp_n): logging.debug("start") tmp_l.reverse() tmp_d["x"] += 1 tmp_n.y *= 2 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) logging.debug("end") def main(): logging.debug("start") with Manager() as manager: tmp_l = manager.list() tmp_d = manager.dict() tmp_n = manager.Namespace() # 値を設定 tmp_l.append(1) tmp_l.append(2) tmp_l.append(3) tmp_d["x"] = 0 tmp_n.y = 1 # 実行前の値 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) p = Process(target=myworker, args=(tmp_l, tmp_d, tmp_n)) p.start() p.join() # 実行後の値 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3] MainProcess: {'x': 0} MainProcess: Namespace(y=1) Process-2: start Process-2: [3, 2, 1] Process-2: {'x': 1} Process-2: Namespace(y=2) Process-2: end MainProcess: [3, 2, 1] MainProcess: {'x': 1} MainProcess: Namespace(y=2) MainProcess: end
Managerは、以下の部分のようにwith句を使って使用します。
with Manager() as manager: tmp_l = manager.list() tmp_d = manager.dict() tmp_n = manager.Namespace() # 値を設定 tmp_l.append(1) tmp_l.append(2) tmp_l.append(3) tmp_d["x"] = 0 tmp_n.y = 1
上記例では、tmp_lがリスト、tmp_dが辞書で通常のリストや辞書のようにデータの操作ができます。tmp_nはNamespaceというもので「.(ドット)」を使ってクラスのプロパティのように値の操作ができます。
あとはプロセスの引数としてこれらの変数を渡してあげることで、プロセス間でデータ共有が簡単にできます。
ValueやArrayに比べてPythonのオブジェクトのように扱うことができるため便利です。ただし、注意点としてValueやArrayに比べて処理が遅いという特徴がありますので、使いどころをについてはよく検討してもらえると良いかと思います。
異なるサーバー間でのデータ共有
Managerは、ネットワーク越しで異なるサーバー間でデータ共有することも可能です。以下の例で見てみましょう。
server.py
import queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): q = queue.Queue() # マネージャーで呼び出し可能なオブジェクト登録 QueueManager.register("get_queue", callable=lambda: q) # マネージャーを取得 # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス # authkey: サーバープロセスへのコネクションを検証するための認証キー manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever() if __name__ == "__main__": main()
client1.py(データをキューに登録)
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): # オブジェクト登録 QueueManager.register("get_queue") # マネージャーを取得 manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーへ接続 manager.connect() queue = manager.get_queue() queue.put("hello") if __name__ == "__main__": main()
client2.py(データをキューから取り出す)
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): # オブジェクト登録 QueueManager.register("get_queue") # マネージャーを取得 manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーへ接続 manager.connect() queue = manager.get_queue() print(queue.get()) if __name__ == "__main__": main()
【実行結果】()内は手順の補足 (1. サーバーの立ち上げ) > python server.py (2. 別のターミナルを立ち上げて実行: "hello"がキューにputされる) > python client1.py (3. 別のターミナルを立ち上げて実行: "hello"がキューからgetされて表示される。キューに値がない場合は待ち状態になり、client1でputされたら表示される) > python .\multi_client2.py hello
上記例では、3つのプログラムを作っています。server.pyがサーバーで動作し、client1.pyがデータをキューに登録し、client2.pyがキューからデータを取り出すクライアントとして動いているようなイメージを持ってもらえればと思います。
server.pyの内容
まずは、server.pyの内容から見ていきましょう。
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass
multiprocessing.managersからBaseManagerをインポートし、BaseManagerを継承したQueueManagerというクラスを作成しています。
def main(): q = queue.Queue() # マネージャーで呼び出し可能なオブジェクト登録 QueueManager.register("get_queue", callable=lambda: q) # マネージャーを取得 # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス # authkey: サーバープロセスへのコネクションを検証するための認証キー manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever()
メイン関数では、queueモジュールからQueueを生成し、registerメソッドで、呼び出し可能なものとして登録をしています。これにより、manager.get_queue()でキューを取り出すことができます。
manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")
QueueManagerの生成時には、addressを使ってコネクションを待ち受けるアドレスとポートを指定し、authkeyで認証キーを設定します。これらの値をclient側で用いることでアクセスできるようになります。今回は、自端末で確認できるようにlocalhostとしていますが、実際にはサーバーのアドレスを入力します。
# サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever()
あとは、サーバーを取得、起動します。serve_foreverを実行することでサーバー機能が常時動く状態となります。
client1.pyの内容
次にデータをキューに登録するclient1.pyの処理を見てみます。作りはserver.pyに非常に似ており、クラス定義やオブジェクト登録、マネージャーの取得の部分は同じです。
# サーバーへ接続 manager.connect() queue = manager.get_queue() queue.put("hello")
サーバーへ接続しているところが上記の部分ですが、connectを使うことでサーバーに接続します。get_queueによりキューを取得できるので、キューに”hello”という文字列をputで登録しています。
client2.pyの内容
最後に、キューから値を取り出すclient2.pyの処理ですが、client1.pyとほとんど同じです。
# サーバーへ接続 manager.connect() queue = manager.get_queue() print(queue.get())
キューの操作では、putではなくgetで値を取得している点がclient1と異なります。
処理の実行手順
動作の確認の手順についてです。まずは、server1.pyを実行します。serve_foreverでサーバーが起動状態となります。
次に別のプロンプトを開いてclient1.pyを実行します。これによりキューに”hello”という文字列が登録されます。
最後にまた別のターミナルを開いてclient2.pyを実行します。すると、client1.pyがキューに登録した”hello”という文字列が表示されます。
複数回client2.pyを実行すると待ち状態になります。これは、キューに値が入らないので待っている状態です。その状態で、client1.pyを実行すると、client2.py側ですぐに”hello”と表示されます。
今回の実行手順は自端末(localhost)での確認方法で紹介しましたが、それぞれサーバー、クライアントで実行することで異なるサーバー間でもデータ共有が可能です。
まとめ
Pythonでマルチプロセスを実装するためのmultiprocessingモジュールの使い方について解説しました。
並列処理を実装する場合には、マルチスレッドとマルチプロセスという選択肢が考えられます。Pythonにおいてはグローバルインタープリタロック(GIL)の影響でマルチスレッドではCPUバウンドな処理では効果を発揮できない特徴があります。
マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。
multiprocessingモジュールでは、マルチプロセスを実行するために便利なプロセスプール(Pool)や便利なデータ交換方法(Queue、Pipe)・データ共有方法(Value、Array、Manager)を提供してくれます。
並列処理は難しい領域だと思いますが、ぜひmultiprocessingモジュールの使い方を覚えてもらえるといいかと思います。
multiprocessingの公式ドキュメントはこちらを参照してください。