Pythonでマルチプロセスを実装するためのmultiprocessing
モジュールの使い方について解説します。
Contents
Pythonによるマルチプロセス
マルチプロセスとは、複数の独立したプロセスを用いてタスクを実行する方法のことを言います。Pythonでは、マルチプロセスの実装のためのmultiprocessing
モジュールが提供されています。
本記事では、multiprocessing
モジュールを用いたマルチプロセスの実装の基本について紹介します。
具体的なモジュールの使い方の説明の前に、まずはPythonにおけるマルチプロセスとマルチスレッドの違いについて簡単に触れておこうと思います。
Pythonにおけるマルチプロセスとマルチスレッド
Pythonには、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているのはC言語をベースにしたCPythonです。
CPythonにおいては、マルチスレッドを使用することは可能ですが、グローバルインタープリタロック(GIL)の存在により、CPUバウンドなタスクにおいてマルチスレッドの利点を最大限に活用できないという制約があります。GILは全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。
GILの存在により、Pythonオブジェクトへのアクセスは一度に一つのスレッドだけに限られます。そのため、PythonでのCPUバウンドな処理では、マルチスレッドを利用しても真の並列実行が難しく、スレッドがシリアル化されて順次実行されるため、高速化されることはほとんどありません。
マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。
また、マルチプロセスでは、各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作することが特徴です。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有するため、データの安全性に関する注意が必要です。
しかし、マルチプロセスを利用する際の課題として、プロセス間のデータの交換が必要になることが挙げられます。Pythonにはこのようなプロセス間の通信を助ける強力なツールが提供されており、以降でも紹介していきます。
なお、Pythonでマルチスレッドを実現するthreadingモジュールの使い方については「threadingによるマルチスレッド処理の基本」を参考にしてください。
並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。
multiprocessingの使い方
基本的な使い方 Process
multiprocessing
モジュールを使って、複数のプロセスを生成し、処理を実行する基本的な使い方を以下の例で見ていきましょう。threadingモジュールを使ったことがある人はほとんど同じであることが分かるかと思います。
import logging import os import time from multiprocessing import Process logging.basicConfig( level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s" ) def myworker1(): logging.debug(f"start (pid:{os.getpid()})") time.sleep(5) logging.debug(f"end (pid:{os.getpid()})") def myworker2(): logging.debug(f"start (pid:{os.getpid()})") time.sleep(5) logging.debug(f"end (pid:{os.getpid()})") def main(): logging.debug(f"start (pid:{os.getpid()})") # プロセスの生成 process1 = Process(target=myworker1) process2 = Process(target=myworker2) # プロセスの開始 process1.start() process2.start() # プロセスの終了を待機 process1.join() process2.join() logging.debug(f"end (pid:{os.getpid()})") if __name__ == "__main__": main()
【実行結果】 MainProcess_MainThread: start (pid:27024) Process-1_MainThread: start (pid:28944) Process-2_MainThread: start (pid:18884) Process-2_MainThread: end (pid:18884) Process-1_MainThread: end (pid:28944) MainProcess_MainThread: end (pid:27024)
上記例は、myworker1
とmyworker2
という関数を別のプロセスで並列処理するプログラムとなっています。実行してみていただくとProcess-1とProcess-2というプロセスが動作し、それぞれのメインスレッド(MainThread)で処理が動作していることが分かります。以下のイメージ図のような形です。
os.getpid
でProcess ID(PID)を取得し表示していますが、それぞれ異なっていることからも別プロセスであることが把握できます。
プロセスの生成は、multiprocessing
モジュールのProcess
を使います。以下のようにtarget
引数に各プロセスで実行する関数を渡します。
# プロセスの生成 process1 = Process(target=myworker1) process2 = Process(target=myworker2)
各プロセスの終了を待機するのがjoin
メソッドです。プロセスが終了したら、join
より下のコードが実行されます。
# プロセスの終了を待機 process1.join() process2.join()
プロセスの出力では、以下の部分で定義しているようにlogging
モジュールを使用しています。logging
のformat
でprocessName
やthreadName
を指定すると分かりやすく表示できます。
logging.basicConfig( level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s" )
今回は、それぞれでプロセスでMainThreadが動いていることを示すためにthreadName
を表示しましたが、以降の例では煩雑になるのでprocessName
のみ表示にします。
なお、logging
については「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。
上記のようにProcess
を使うことで簡単にマルチプロセスの実装をすることができます。
プロセス名称をつけたい場合
上記例では、プロセス名称(processName
)が「Process-1」「Process-2」というようになっていました。プロセスに個別に名称をつけたい場合には、Process
生成時に以下のようにname
引数で指定します。(変更のない部分は省略します。)
...(省略)... # プロセスの生成 process1 = Process(target=myworker1, name="myworker1_process") process2 = Process(target=myworker2, name="myworker2_process") ...(省略)...
【実行結果】 MainProcess: start myworker1_process: start myworker2_process: start myworker1_process: end myworker2_process: end MainProcess: end
name
引数で指定することで、logging
で出力しているprocessName
の部分は指定した名称に変わります。
プロセスで動作する関数への引数の渡し方
プロセスで動作する関数へ引数を渡したい場合には、Process
の生成時に以下のようにargs
引数やkwargs
引数を使用します。(変更のない部分は省略します)
...(省略)... def myworker1(x: int, y: int): logging.debug("start") logging.debug(f"x: {x}, y: {y}") time.sleep(5) logging.debug("end") ...(省略)... # プロセスの生成 process1 = Process(target=myworker1, args=(10,), kwargs={"y": 20}) ...(省略)...
【実行結果】 MainProcess: start Process-1: start Process-1: x: 10, y: 20 Process-2: start Process-1: end Process-2: end MainProcess: end
上記例では、myworker1
の方の関数に引数としてint
のx
, y
があります。この関数に引数を渡す場合には、args
にタプルで位置引数で指定するか、kwargs
に辞書でキーワード引数として指定することができます。args
はタプルなので、1つ引数を渡す場合は上記の(10,)
のようにカンマ(,)がいるので注意しましょう。
実行結果を見てもわかるようにProcess-1の方では、メインプロセスから引数として渡した情報を表示できていることが分かります。
マルチスレッドとマルチプロセスのメモリの扱いの違い
冒頭でマルチプロセスは各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作するということを説明しました。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有します。
このことを簡単なプログラムで確認してみたいと思います。
マルチプロセスの場合
import logging from multiprocessing import Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(data: dict): logging.debug("start") data["x"] += 1 logging.debug(f"end: {data}, {id(data)}") def main(): logging.debug("start") # データを用意 data = {"x": 0} # プロセスを生成 process1 = Process(target=myworker, args=(data,)) process2 = Process(target=myworker, args=(data,)) # プロセスの開始 process1.start() process2.start() # プロセスの終了を待機 process1.join() process2.join() logging.debug(f"end: {data}, {id(data)}") if __name__ == "__main__": main()
【実行結果】 MainProcess: start Process-1: start Process-2: start Process-1: end: {'x': 1}, 2324205073536 Process-2: end: {'x': 1}, 1441800731136 MainProcess: end: {'x': 0}, 3244999688960
上記例では、data
という辞書を用意して、Process-1、Process-2に渡しています。この時、対象の辞書のid
を確認していますが、マルチプロセスでは、MainProcess、Process-1、Process-2それぞれでid
が異なっていることが分かります。
Process-1やProcess-2は、開始時点でMainProcessから子プロセスとして生成されます。このような処理をフォーク(fork)と言います。この時点でdata
の辞書は{'x': 0}
で各プロセスごとでコピーされるような形となるため、各プロセスでは1が加算されて{'x': 1}
となっていますが、MainProcessでは{'x':0}
のままになります。
このように、マルチプロセスでは各プロセスで独立したメモリ空間を扱います。
マルチスレッドの場合
以下は上記と全く同じコードを、マルチスレッドで実装した場合です。
import logging from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(data: dict): logging.debug("start") data["x"] += 1 logging.debug(f"end: {data}, {id(data)}") def main(): logging.debug("start") # データを用意 data = {"x": 0} # スレッドを生成 thread1 = Thread(target=myworker, args=(data,)) thread2 = Thread(target=myworker, args=(data,)) # スレッドの開始 thread1.start() thread2.start() # スレッドの終了を待機 thread1.join() thread2.join() logging.debug(f"end: {data}, {id(data)}") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (myworker): start Thread-1 (myworker): end: {'x': 1}, 3138390283776 Thread-2 (myworker): start Thread-2 (myworker): end: {'x': 2}, 3138390283776 MainThread: end: {'x': 2}, 3138390283776
マルチスレッドでも同じように実装していますが、MainThread、Thread-1、Thread-2で対象の辞書のid
は全く同じになっていることが分かります。このようにマルチスレッドでは各スレッドが同じメモリ領域を参照します。
実際に上記のプログラムはスレッドセーフではありません。特にdata["x"] +=1
という部分は、複数のスレッドから同時にアクセスされると不正な結果を生じる可能性があります。安全にデータの更新を行うにはLock
等を使用して排他制御を行うのが適切です。
上記で見たようにマルチプロセスとマルチスレッドのメモリの扱いの違いはよく理解しておくようにしましょう。
プロセスプール Pool
マルチプロセスは、各プロセスが独立したリソースを使用するため子プロセスを大量に生成してしまうことは避けるべきです。マルチプロセスを用いたアプリケーションでは、プロセスプールを構築してリソースの利用を制限するのが良い方法です。
multiprocessing
モジュールでは、Pool
クラスというプロセスプールのための枠組みを提供してくれています。これは、threading
モジュールには含まれておらず、この点はmultiprocessing
の方が便利だと感じる点です。Pool
クラスが、複数のプロセスの管理を簡単にしてくれます。
Pool
は、以下のようにwith
句で使用します。ここでは構成を説明のみとし、具体的な処理は省略しています。
# プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(): ...(マルチプロセスで動作させる処理を記載)... def main(): # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: ...(処理を記載)... if __name__ == "__main__": main()
上記例では、プロセスプールのサイズを3にしているので、プロセスは同時に3つまで起動します。(処理の記載)の部分では、使用できる便利なメソッドがいくつもあります。以降ではそれらのメソッドを使用したプロセスプールの実装例を紹介していきます。
プールでのプロセス実行 apply_async
Pool
を使ったプロセスプールでプロセスを実行する場合には、以下のようにapply_async
メソッドが使用できます。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x def main(): logging.debug("start") # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: process1 = pool.apply_async(func=myworker, args=(10,)) process2 = pool.apply_async(func=myworker, args=(20,)) process3 = pool.apply_async(func=myworker, args=(30,)) process4 = pool.apply_async(func=myworker, args=(40,)) process5 = pool.apply_async(func=myworker, args=(50,)) logging.debug("execute async") # 実行結果を取得 logging.debug(process1.get()) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-3: start SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-3: end SpawnPoolWorker-2: end SpawnPoolWorker-1: start SpawnPoolWorker-3: start MainProcess: 10 MainProcess: 20 MainProcess: 30 SpawnPoolWorker-1: end SpawnPoolWorker-3: end MainProcess: 40 MainProcess: 50 MainProcess: end
apply_async
の使い方は、Process
と似ていますが、target
に相当する部分がfunc
となっています。asyncは非同期を意味するため、以降の”execute async”という出力が先にされていることが分かります。なお、関数の返却値は、get
メソッドで取得できます。
今回プロセスプールのサイズを3としているので、一度に実行されるプロセスは3つまでです。上記結果例では、プロセス1と3が終わり次第、残りの2つプロセスが起動し後続処理が動作していることが分かります。
timeoutの設定
apply_async
の実行では、以下のようにtimeout
を指定することができます。変更のない部分は省略しています。timeout
は、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutError
となります。
...(省略)... # 実行結果を取得 logging.debug(process1.get(timeout=1)) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) ...(省略)...
【実行結果】 MainProcess: start MainProcess: execute async SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: start Traceback (most recent call last): ...(省略)... multiprocessing.context.TimeoutError
上記例では、timeout=1
としているので1秒以内に処理が返ってくる必要がありますが、myworker
の処理でtime.sleep(5)
を入れているので処理は1秒以内には終わりません。この場合は、上記のようにTimeoutError
となります。
timeout
引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。
プロセスのブロック apply
apply_async
に似たメソッドとして、apply
メソッドがあります。このメソッドは、終了するまで後続の処理をブロックします。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x def main(): logging.debug("start") # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # プロセスでブロック process = pool.apply(func=myworker, args=(0,)) logging.debug(process) # 上記を実行してから並列実行 process1 = pool.apply_async(func=myworker, args=(10,)) process2 = pool.apply_async(func=myworker, args=(20,)) process3 = pool.apply_async(func=myworker, args=(30,)) process4 = pool.apply_async(func=myworker, args=(40,)) process5 = pool.apply_async(func=myworker, args=(50,)) # 実行結果を取得 logging.debug(process1.get()) logging.debug(process2.get()) logging.debug(process3.get()) logging.debug(process4.get()) logging.debug(process5.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start SpawnPoolWorker-1: start SpawnPoolWorker-1: end MainProcess: 0 SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: start SpawnPoolWorker-1: end SpawnPoolWorker-3: end SpawnPoolWorker-2: end SpawnPoolWorker-1: start SpawnPoolWorker-3: start MainProcess: 10 MainProcess: 20 MainProcess: 30 SpawnPoolWorker-1: end SpawnPoolWorker-3: end MainProcess: 40 MainProcess: 50 MainProcess: end
apply
の使い方は、apply_async
と似ていますが、apply
は同期的に動作し、指定した関数の終了を待ちます。上記実行結果を見ても分かるように、まずapply
で指定した1つのプロセスが実行されて、後続の処理はブロックされます。
このことから、並列処理という観点ではapply_async
の方が適しています。先にある処理を実行してから後続処理を並列で実行するような、処理の順序付けをしたい場合には、apply
を使うことでうまく制御できるかと思います。
イテラブルな引数の使用 map
apply_async
やapply
は一つの引数の組を渡してプロセスを実行しました。例えばリストで保持している引数を順に渡して処理をさせたいような場合には、for
文で順次渡すといったことが必要です。しかし、Pool
クラスでは、map
メソッドが用意されているので、このようなケースでも以下のようにシンプルに記載できます。
このmap
メソッドは、組み込み関数のmap
の並列版ともいえるものです。ただし、組み込み関数のmap
は複数のイテラブルを指定できるのに対し、Pool
のmap
メソッドはイテラブルな引数は一つのみサポートするという点で違いがあります。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # mapで各プロセスを実行 ↓でブロックされる result = pool.map(func=myworker, iterable=values) # 全てのプロセスが完了してから表示 logging.debug("executed") # 実行結果を表示 logging.debug(result) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] SpawnPoolWorker-1: start SpawnPoolWorker-3: start SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-1: start SpawnPoolWorker-2: end SpawnPoolWorker-3: end SpawnPoolWorker-2: start SpawnPoolWorker-1: end SpawnPoolWorker-2: end MainProcess: executed MainProcess: [2, 4, 6, 8, 10] MainProcess: end
上記の例では、myworker
は受け取った値を2倍するようなものになっています。map
関数は、apply_async
やapply
とは異なり、イテラブルな引数を受け取り、その各要素を関数に適用します。map
メソッドは、iterable
で指定したイテラブルの各要素を順次渡して処理をし、処理結果を返却します。
map
メソッドでは、map
を記載した行で処理がブロックされるため、後続の”executed”という文字は全てのプロセスが完了してから表示されます。
イテラブルな引数の使用(非同期版) map_async
上記例で見たようにmap
は後続の処理をブロックするような動作をしました。map
の非同期版ともいえるのが、map_async
メソッドです。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # map_asyncで非同期に実行 result = pool.map_async(func=myworker, iterable=values) # 非同期なのですぐ表示される logging.debug("execute async") # 実行結果を表示 logging.debug(result.get()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-1: end SpawnPoolWorker-2: end SpawnPoolWorker-3: end SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-2: end SpawnPoolWorker-1: end MainProcess: [2, 4, 6, 8, 10] MainProcess: end
map_async
の使い方は、map
と同じですが、map_async
は非同期であるため”execute async”という文字列ががすぐに表示されていることが分かります。
timeoutの設定
map_async
の実行では、以下のようにtimeout
を指定することができます。変更のない部分は省略しています。timeout
は、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutError
となります。
...(省略)... # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # map_asyncで非同期に実行 result = pool.map_async(func=myworker, iterable=values) # 非同期なのですぐ表示される logging.debug("execute async") # 実行結果を表示 logging.debug(result.get(timeout=1)) ...(省略)...
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: execute async SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start Traceback (most recent call last): ...(省略)... multiprocessing.context.TimeoutError
上記例では、timeout=1
としているので1秒以内に処理が返ってくる必要がありますが、処理でtime.sleep(5)
を入れているので処理は1秒以内には終わりません。この場合は、上記のようにTimeoutError
となります。
timeout
引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。
イテラブルな引数の使用(遅延評価版) imap
map
、map_async
と紹介してきましたが、imap
という遅延評価版のメソッドもあります。遅延評価とは、必要になるタイミングまで値の評価がされないことを意味します。
import logging import time from multiprocessing import Pool logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(x: int): logging.debug(f"start") time.sleep(5) logging.debug("end") return x * 2 def main(): logging.debug("start") values = [i for i in range(1, 6)] logging.debug(values) # プロセスプールの生成 with Pool(PROCESS_POOL_SIZE) as pool: # imapでイテレータを生成 process_iter = pool.imap(func=myworker, iterable=values) # ここではイテレータが取得されるだけ logging.debug(process_iter) logging.debug("execute") # 使用するときにイテレータから読みだして実行 (遅延評価) for i in process_iter: logging.debug(i) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3, 4, 5] MainProcess: <multiprocessing.pool.IMapIterator object at 0x000001D3AF5D4110> MainProcess: execute SpawnPoolWorker-1: start SpawnPoolWorker-2: start SpawnPoolWorker-3: start SpawnPoolWorker-2: end SpawnPoolWorker-1: end SpawnPoolWorker-1: start SpawnPoolWorker-2: start MainProcess: 2 MainProcess: 4 SpawnPoolWorker-3: end MainProcess: 6 SpawnPoolWorker-2: end SpawnPoolWorker-1: end MainProcess: 8 MainProcess: 10 MainProcess: end
imap
は、map
やmap_async
と使用方法は同じですが、返却値がイテレータとなっています。返却値を表示してみていますが「multiprocessing.pool.IMapIterator object」となっていることからもイテレータであることが分かります。
その後、”execute”という表示をしていますが、このタイミングではプロセスの実行はされていません。具体的にfor文でイテレータから取り出しているタイミングで各プロセスで処理が実行されます。必要なタイミングで処理が動いていることから遅延評価と言います。
プロセス間のデータ交換方法
マルチプロセスでは、複数のプロセスが独立して動作するため、プロセス間のデータ交換の仕組みが必要です。以降では、multiprocessing
モジュールでのプロセス間のデータ交換で使用できるQueue
とPipe
について紹介します。
Queue
キューを用いたプロセス間のデータ制御をするためにmultiprocessing
モジュールでは、Queue
クラスが用意されています。これはqueue
モジュールのQueue
と似ていますが、完全に同じように使用できるわけではありません。実際、multiprocessing.Queue
はプロセス間の通信に特化しており、queue.Queue
はスレッド間の通信を目的としています。また、例外についてはqueue
モジュールのqueue.Empty
やqueue.Full
といった例外が送出されます。
import logging import time from multiprocessing import Process, Queue from queue import Empty logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") # プロセスプールのサイズ PROCESS_POOL_SIZE = 3 def myworker(work_queue): logging.debug("start") # キューが空でない限り繰り返す while not work_queue.empty(): try: item = work_queue.get_nowait() except Empty: break else: time.sleep(1) logging.debug(item) logging.debug("end") def main(): logging.debug("start") # プロセスで共有するキューを用意する work_queue = Queue() # 表示したい値のリスト vals = [i for i in range(10)] # キューに投入する for val in vals: work_queue.put(val) # プロセスプール数のプロセスを用意 processes = [ Process(target=myworker, args=(work_queue,)) for _ in range(PROCESS_POOL_SIZE) ] # プロセスの開始 for process in processes: process.start() # 終了を待機 for process in processes: process.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start Process-1: start Process-3: start Process-2: start Process-2: 2 Process-3: 1 Process-1: 0 Process-2: 3 Process-3: 4 Process-1: 5 Process-1: 8 Process-3: 7 Process-2: 6 Process-3: end Process-2: end Process-1: 9 Process-1: end MainProcess: end
上記例は、Pool
クラスでのプロセスプールの例をQueue
を使って実装したようなものになっています。処理の構成については、threading
におけるでキューの使用を説明している「キューを用いたスレッド制御」と同じのため、詳細はそちらを参照してください。
ただし、参照先ページのqueue.Queue
の例では、task_done
やjoin
といったメソッドがありますが、multiprocessing
のQueue
には、task_done
やjoin
といったメソッドがないので注意してください。
Pipe
各プロセス間でソケットのような双方向の通信チャンネルを作成したい場合には、Pipe
を使用します。Pipe
は以下のように使用します。
import logging from multiprocessing import Pipe, Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") class CustomClass: pass def myworker(conn): logging.debug("child: start") # 送信データの用意 data = [1, "string", {"dict": 0}, CustomClass()] logging.debug(f"child: {data}") # 親プロセスへデータを送信 conn.send(data) # コネクションをクローズ conn.close() logging.debug("child: end") def main(): logging.debug("parent: start") # 親コネクションと子コネクションを生成 # 引数指定しない場合は、デフォルトはPipe(duplex=True)で双方向になっている # Pipe(duplex=False)にするとparrent_conn:受信専用、child_conn:送信専用になる parent_conn, child_conn = Pipe() child_process = Process(target=myworker, args=(child_conn,)) # 子プロセスの開始 child_process.start() # 子プロセスからのデータを受信 data = parent_conn.recv() logging.debug(f"parent: {data}") # プロセスの終了を待機 child_process.join() logging.debug("parent: end") if __name__ == "__main__": main()
【実行結果】 MainProcess: parent: start Process-1: child: start Process-1: child: [1, 'string', {'dict': 0}, <__mp_main__.CustomClass object at 0x000001A0762E9750>] Process-1: child: end MainProcess: parent: [1, 'string', {'dict': 0}, <__main__.CustomClass object at 0x00000225FF32EE90>] MainProcess: parent: end
Pipe
を使用する際には「parent_conn, child_conn = Pipe()
」といったように生成します。parent_conn
は親コネクションで、child_conn
は子コネクションを表しています。
子プロセスを生成する際に、child_conn
を引数として渡すことで親プロセスとの通信ができるようになります。
def myworker(conn): logging.debug("child: start") # 送信データの用意 data = [1, "string", {"dict": 0}, CustomClass()] logging.debug(f"child: {data}") # 親プロセスへデータを送信 conn.send(data) # コネクションをクローズ conn.close() logging.debug("child: end")
子プロセスでは上記のように受け取ったコネクションを使用して、send
メソッドで親プロセスにデータを送信できます。コネクションは不要になったらclose
するようにしましょう。
# 子プロセスからのデータを受信 data = parent_conn.recv() logging.debug(f"parent: {data}")
親プロセス側では、上記の部分で子プロセスからのデータを受信します。データの受信では、親コネクションのrecv
メソッドを使用します。これにより子プロセスからのデータを受信できます。
このようにPipe
を使うことで親プロセスと子プロセス間でデータのやり取りができます。なお、Pipe
生成時に引数を指定しないデフォルトの場合、デフォルトはPipe(duplex=True)
で双方向通信できるようになっています。そのため、parent_conn
の方からsend
メソッドを使うこともできます。
しかし、Pipe(duplex=False)
にするとparrent_conn
は受信専用、child_conn
は送信専用となります。通信方向を制限したい場合には使用を検討するとよいでしょう。もし、duplex=False
にした状態で、parent_conn.send
を実行しようとすると以下のように例外となります。
Traceback (most recent call last): ...(省略)... OSError: connection is read-only
なお、上記の例ではカスタムクラスのオブジェクトをやり取りするようになっていますが、この場合、オブジェクトがpickle可能であることが必要です。オブジェクトがpickle可能であるというのは、pickle
モジュールを使用してシリアル化とデシリアル化が可能であることを意味し、オブジェクトにより可否が分かれるため注意しましょう。
プロセス間におけるデータの共有
マルチプロセス間でデータをやり取りするQueue
やPipe
について見てきました。プロセス間でデータをやり取りする方法としてもう一つプロセス間で共有される専用メモリを使う方法があります。以降では共有メモリを使うための仕組みをいくつか紹介します。
マルチプロセスでは、プロセス生成時のオーバーヘッドがかかったり、データを交換する手間が発生したりしますが、メモリ空間を共有しないことは誤ってメモリのデータを破壊してしまう危険性が減るため利点と言えます。
そのため、基本的にはマルチプロセスでは、以降で紹介するデータ共有の仕組みは使わない方がいいと思います。どうしてもデータを共有しなければならないケースなどでは十分注意して使うようにしましょう。
Value, Array
プロセス間でデータを共有する簡単な方法としては、Value
、Array
を使う方法があります。これらのオブジェクトは共有ctypesオブジェクト(shared ctypes)と言い、プロセス間で共有されるオブジェクトになります。
import logging from multiprocessing import Array, Process, Value logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(num, arr): logging.debug("start") logging.debug(num) logging.debug(arr) # 値を変更 num.value += 1.0 for i in range(len(arr)): arr[i] *= 2 # 値の表示 logging.debug(num.value) logging.debug(arr[:]) logging.debug("end") def main(): logging.debug("start") # "d":double(倍精度浮動小数) "i":sined int(符号付き整数) 他は以下を参照 # https://docs.python.org/ja/3/library/array.html#module-array num = Value("d", 0.0) arr = Array("i", range(5)) # 値の表示 logging.debug(num.value) logging.debug(arr[:]) # プロセスを生成 p = Process(target=myworker, args=(num, arr)) # プロセスの開始 p.start() # プロセスの終了を待機 p.join() # 値の表示 logging.debug(num.value) logging.debug(arr[:]) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: 0.0 MainProcess: [0, 1, 2, 3, 4] Process-1: start Process-1: <Synchronized wrapper for c_double(0.0)> Process-1: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x0000017E77500DD0>> Process-1: 1.0 Process-1: [0, 2, 4, 6, 8] Process-1: end MainProcess: 1.0 MainProcess: [0, 2, 4, 6, 8] MainProcess: end
Value
やArray
の使い方は簡単で、以下のように生成して使います。
num = Value("d", 0.0) arr = Array("i", range(5)) # 値の表示 logging.debug(num.value) logging.debug(arr[:])
最初の引数に指定している”d”
や"i"
は、"d"
がdouble(倍精度浮動小数)、"i"
がsigned int(符号付き整数)といった型コードを表しています。型コードの指定はこちらのページを参考にしてください。
子プロセス側で変更を加えているのは以下の部分です。
# 値を変更 num.value += 1.0 for i in range(len(arr)): arr[i] *= 2
Value
の値の変更ではnum.value
に対して操作します。また、Array
の方はfor
文で上記のように各要素にアクセスする必要があります。
上記結果を見てもわかるようにValue
やArray
は共有されているため、子プロセスで変更を加えた内容を親プロセス側でも直接確認できていることが分かります。
Manager
Value
、Array
といったデータ共有の仕組みを紹介しましたが、少しPythonらしくなく使いにくい印象を受けたかと思います。
multiprocessing
モジュールには、共有データを扱いやすくするためのManager
というものがあります。Manager
はサーバープロセスを管理するためのものとなっています。Manager
の使い方を以下の例で見てみましょう。
import logging from multiprocessing import Manager, Process logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(tmp_l, tmp_d, tmp_n): logging.debug("start") tmp_l.reverse() tmp_d["x"] += 1 tmp_n.y *= 2 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) logging.debug("end") def main(): logging.debug("start") with Manager() as manager: tmp_l = manager.list() tmp_d = manager.dict() tmp_n = manager.Namespace() # 値を設定 tmp_l.append(1) tmp_l.append(2) tmp_l.append(3) tmp_d["x"] = 0 tmp_n.y = 1 # 実行前の値 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) p = Process(target=myworker, args=(tmp_l, tmp_d, tmp_n)) p.start() p.join() # 実行後の値 logging.debug(tmp_l) logging.debug(tmp_d) logging.debug(tmp_n) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: [1, 2, 3] MainProcess: {'x': 0} MainProcess: Namespace(y=1) Process-2: start Process-2: [3, 2, 1] Process-2: {'x': 1} Process-2: Namespace(y=2) Process-2: end MainProcess: [3, 2, 1] MainProcess: {'x': 1} MainProcess: Namespace(y=2) MainProcess: end
Manager
は、以下の部分のようにwith
句を使って使用します。
with Manager() as manager: tmp_l = manager.list() tmp_d = manager.dict() tmp_n = manager.Namespace() # 値を設定 tmp_l.append(1) tmp_l.append(2) tmp_l.append(3) tmp_d["x"] = 0 tmp_n.y = 1
上記例では、tmp_l
がリスト、tmp_d
が辞書で通常のリストや辞書のようにデータの操作ができます。tmp_n
はNamespaceというもので「.(ドット)」を使ってクラスのプロパティのように値の操作ができます。
あとはプロセスの引数としてこれらの変数を渡してあげることで、プロセス間でデータ共有が簡単にできます。
Value
やArray
に比べてPythonのオブジェクトのように扱うことができるため便利です。ただし、注意点としてValue
やArray
に比べて処理が遅いという特徴がありますので、使いどころをについてはよく検討してもらえると良いかと思います。
異なるサーバー間でのデータ共有
Manager
は、ネットワーク越しで異なるサーバー間でデータ共有することも可能です。以下の例で見てみましょう。
server.py
import queue from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): q = queue.Queue() # マネージャーで呼び出し可能なオブジェクト登録 QueueManager.register("get_queue", callable=lambda: q) # マネージャーを取得 # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス # authkey: サーバープロセスへのコネクションを検証するための認証キー manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever() if __name__ == "__main__": main()
client1.py(データをキューに登録)
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): # オブジェクト登録 QueueManager.register("get_queue") # マネージャーを取得 manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーへ接続 manager.connect() queue = manager.get_queue() queue.put("hello") if __name__ == "__main__": main()
client2.py(データをキューから取り出す)
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def main(): # オブジェクト登録 QueueManager.register("get_queue") # マネージャーを取得 manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーへ接続 manager.connect() queue = manager.get_queue() print(queue.get()) if __name__ == "__main__": main()
【実行結果】()内は手順の補足 (1. サーバーの立ち上げ) > python server.py (2. 別のターミナルを立ち上げて実行: "hello"がキューにputされる) > python client1.py (3. 別のターミナルを立ち上げて実行: "hello"がキューからgetされて表示される。キューに値がない場合は待ち状態になり、client1でputされたら表示される) > python .\multi_client2.py hello
上記例では、3つのプログラムを作っています。server.pyがサーバーで動作し、client1.pyがデータをキューに登録し、client2.pyがキューからデータを取り出すクライアントとして動いているようなイメージを持ってもらえればと思います。
server.pyの内容
まずは、server.pyの内容から見ていきましょう。
from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass
multiprocessing.managers
からBaseManager
をインポートし、BaseManager
を継承したQueueManager
というクラスを作成しています。
def main(): q = queue.Queue() # マネージャーで呼び出し可能なオブジェクト登録 QueueManager.register("get_queue", callable=lambda: q) # マネージャーを取得 # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス # authkey: サーバープロセスへのコネクションを検証するための認証キー manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd") # サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever()
メイン関数では、queue
モジュールからQueue
を生成し、register
メソッドで、呼び出し可能なものとして登録をしています。これにより、manager.get_queue()
でキューを取り出すことができます。
manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")
QueueManager
の生成時には、address
を使ってコネクションを待ち受けるアドレスとポートを指定し、authkey
で認証キーを設定します。これらの値をclient側で用いることでアクセスできるようになります。今回は、自端末で確認できるようにlocalhostとしていますが、実際にはサーバーのアドレスを入力します。
# サーバーを取得 server = manager.get_server() # サーバーを起動する server.serve_forever()
あとは、サーバーを取得、起動します。serve_forever
を実行することでサーバー機能が常時動く状態となります。
client1.pyの内容
次にデータをキューに登録するclient1.pyの処理を見てみます。作りはserver.pyに非常に似ており、クラス定義やオブジェクト登録、マネージャーの取得の部分は同じです。
# サーバーへ接続 manager.connect() queue = manager.get_queue() queue.put("hello")
サーバーへ接続しているところが上記の部分ですが、connect
を使うことでサーバーに接続します。get_queue
によりキューを取得できるので、キューに”hello”という文字列をput
で登録しています。
client2.pyの内容
最後に、キューから値を取り出すclient2.pyの処理ですが、client1.pyとほとんど同じです。
# サーバーへ接続 manager.connect() queue = manager.get_queue() print(queue.get())
キューの操作では、put
ではなくget
で値を取得している点がclient1と異なります。
処理の実行手順
動作の確認の手順についてです。まずは、server1.pyを実行します。serve_forever
でサーバーが起動状態となります。
次に別のプロンプトを開いてclient1.pyを実行します。これによりキューに”hello”という文字列が登録されます。
最後にまた別のターミナルを開いてclient2.pyを実行します。すると、client1.pyがキューに登録した”hello”という文字列が表示されます。
複数回client2.pyを実行すると待ち状態になります。これは、キューに値が入らないので待っている状態です。その状態で、client1.pyを実行すると、client2.py側ですぐに”hello”と表示されます。
今回の実行手順は自端末(localhost)での確認方法で紹介しましたが、それぞれサーバー、クライアントで実行することで異なるサーバー間でもデータ共有が可能です。
まとめ
Pythonでマルチプロセスを実装するためのmultiprocessing
モジュールの使い方について解説しました。
並列処理を実装する場合には、マルチスレッドとマルチプロセスという選択肢が考えられます。Pythonにおいてはグローバルインタープリタロック(GIL)の影響でマルチスレッドではCPUバウンドな処理では十分な効果を発揮できない可能性があります。
マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。
multiprocessing
モジュールでは、マルチプロセスを実行するためにプロセスプール(Pool
)やデータ交換方法(Queue
、Pipe
)・データ共有方法(Value
、Array
、Manager
)といった便利な機能を提供してくれます。
並列処理は難しいですが、ぜひmultiprocessing
モジュールの使い方を覚えてもらえるといいかと思います。
multiprocessingの公式ドキュメントはこちらを参照してください。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。