Pythonでconcurrent.futures
モジュールを使った並列処理の方法について解説します。
Contents
Pythonによる並列処理
Pythonによるマルチスレッド/マルチプロセス
プログラミングにおいて並列処理をする場合には、マルチスレッドやマルチプロセスといった方式を検討することができます。
Pythonでは、マルチスレッドのためのthreading
モジュールやマルチプロセスのためのmultiprocessing
モジュールが提供されています。これらのモジュールの使い方については以下にまとめていますので興味があれば参考にしてください。
ただし、これらの並列処理に関するモジュールはスレッドやプロセスについてよく理解して使用する必要があるため少し扱いが難しく、簡単な並列処理を手軽に実装したい場合には少しハードルが高くなります。
そこで役に立つのが本記事の対象であるPythonの標準ライブラリに組み込まれているconcurrent.futures
モジュールです。
concurrent.futuresモジュール
concurrent.futures
モジュールは、Pythonの標準ライブラリの一部として提供されているモジュールで、マルチスレッド処理やマルチプロセスを簡単に実装するためのAPIを提供してくれています。
concurrent.futures
モジュールは、マルチスレッドのためのThreadPoolExecutor
とマルチプロセスのためのProcessPoolExecutor
という2つの実行クラスを中心に提供してくれています。ThreadPoolExecutor
は、主にI/Oバウンドなタスクの並列化に適しており、ProcessPoolExecutor
は、CPUバウンドなタスクの並列化に適しています。それぞれの使用方法の基本は後述します。
concurrent.futures
モジュールは、簡単な並列処理を、例えばmap
メソッドを使った方法などのシンプルなAPIで実装できる点がメリットです。一方で、複雑な並列処理を実装したい場合にはカスタマイズ性に欠けるというデメリットもあります。複雑な並列処理が必要な場合は、上記で紹介したようなthreading
モジュールやmultiprocessing
モジュールを使用することでより柔軟な実装ができます。
本記事の以降では、concurrent.futures
モジュールで中心的なThreadPoolExecutor
やProcessPoolExecutor
の基本的な使い方を紹介します。
並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。
concurrent.futuresの使い方
ThreadPoolExecutorを用いたマルチスレッド処理
基本的な使い方
ThreadPoolExecutor
を用いたマルチスレッド処理の基本的な使い方を以下例を用いて紹介します。
import logging import time from concurrent.futures import ThreadPoolExecutor logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(x: int, y: int): logging.debug("start") result = x + y time.sleep(5) logging.debug(f"end: {result}") return result def main(): logging.debug("start") # max_workersでスレッド数を指定 with ThreadPoolExecutor(max_workers=5) as executor: f1 = executor.submit(myworker, 2, 3) f2 = executor.submit(myworker, 5, 10) logging.debug(f1.result()) logging.debug(f2.result()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start ThreadPoolExecutor-0_0: start ThreadPoolExecutor-0_1: start ThreadPoolExecutor-0_1: end: 15 ThreadPoolExecutor-0_0: end: 5 MainThread: 5 MainThread: 15 MainThread: end
上記のプログラムでは、myworker
関数というx
とy
という引数を受け取り、その合計を計算する処理をマルチスレッドで実行しています。マルチスレッド処理のためのスレッドプールを使うためにThreadPoolExecutor
を、concurrent.futures
からインポートして使用します。
まず、ThreadPoolExecutor
のwith句を使用してスレッドプールを作成します。この時に、max_workers
引数で同時に実行できる最大スレッド数を指定します。今回の例では、最大で5つのスレッドが実行できる設定になっています。
スレッドを実行するためにはsubmit
メソッドを使用します。executor.submit
で実行する関数と当該関数に渡す引数を指定します。上記の例では2つのmyworker関数がマルチスレッドで並列動作します。
各スレッドでの実行結果は、f1.result()
やf2.result()
のようにresult
メソッドにより取得できます。このコードの部分で処理が完了いていない場合は、完了するまでブロックされます。
今回、最大スレッド数を指定する引数のmax_workers=5
としているので、2つのスレッドが並列で実行されますが、例えば、max_workers=1
と変更して実行するとスレッドは1つのタスクが終了した後に次のタスクが実行されます。
なお、出力には以下の部分で定義しているようにlogging
を使用しています。logging
のformat
でthreadName
を指定すると分かりやすく表示できます。
logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")
logging
については「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。
mapを使用した複数スレッドの実行
上記で紹介した例では、各スレッド用にそれぞれでsubmit
を実行しました。同じ関数を引数違いで並列実行したい場合には、以下のようにmap
メソッドを使用することでまとめて実行できます。
import concurrent.futures import logging import time logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(x: int, y: int): logging.debug("start") result = x + y time.sleep(5) logging.debug(f"end: {result}") return result def main(): logging.debug("start") # max_workersでスレッド数を指定 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: args = [[2, 5], [3, 10]] result = executor.map(myworker, *args) # 返却値はイテレータ logging.debug(result) # 読みだしたタイミングで実行 logging.debug([r for r in result]) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start ThreadPoolExecutor-0_0: start ThreadPoolExecutor-0_1: start MainThread: <generator object Executor.map.<locals>.result_iterator at 0x000001F348A9CA40> ThreadPoolExecutor-0_0: end: 5 ThreadPoolExecutor-0_1: end: 15 MainThread: [5, 15] MainThread: end
map
メソッドの使い方は、submit
メソッドに似ています。引数をリストのargs
として用意して、map
メソッドに渡すときには*args
といった形で渡しています。ここでのargs
は、Pythonのアンパック演算子を使用して、リスト内の複数のリストを個別の引数として関数に渡しています。
args=[[2, 5], [3, 10]]
は、x
のリスト[2, 5]
とy
のリスト[3, 10]
を持ったリストです。map
メソッドに渡すときに*args
としているのは、executor.map(myworker, [2, 5], [3, 10])
と同じ意味になります。
同じ関数を引数違いで実行する場合には、上記のようにmap
を使用すると便利です。
Pythonのマルチスレッドでは、グローバルインタープリタロック(Global Interpreter Lock: GIL)の存在により、一度に1つのスレッドしか実行されない制限があるため、CPUバウンドの処理でのスピードアップを期待することが難しいくなります。なお、この制限はPythonのCPython実装に固有のものです。このような処理ではマルチプロセスの使用を検討すると良いでしょう。
ProcessPoolExecutorを用いたマルチプロセス
基本的な使い方
ProcessPoolExecutor
を用いたマルチプロセスの基本的な使い方を以下例を用いて紹介します。
import logging import time from concurrent.futures import ProcessPoolExecutor logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(x: int, y: int): logging.debug("start") result = x + y time.sleep(5) logging.debug(f"end: {result}") return result def main(): logging.debug("start") # max_workersでプロセス数を指定 with ProcessPoolExecutor(max_workers=5) as executor: f1 = executor.submit(myworker, 2, 3) f2 = executor.submit(myworker, 5, 10) logging.debug(f1.result()) logging.debug(f2.result()) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start SpawnProcess-1: start SpawnProcess-2: start SpawnProcess-1: end: 5 MainProcess: 5 SpawnProcess-2: end: 15 MainProcess: 15 MainProcess: end
上記例を見ていただけると分かりますが、ThreadPoolExecutor
と使い方は同じで、ProcessPoolExecutor
のwith
句で各プロセスで動作させる関数と引数を指定してsubmit
します。結果としては、SpawnProcess-1, SpawnProcess-2ということで、異なるプロセスで実行されていることが分かるかと思います。
異なるのは、ProcessPoolExecutor
を使用すると、各タスクが異なるプロセス上で実行され、これによりPythonのGILの制約を受けずに真の並列実行が可能になる点です。
なお、logging
設定をprocessName
としてプロセス名が表示されるように変更していますので注意してください。
logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")
このように、マルチスレッドの場合と同様の書き方でマルチプロセスの実装が可能です。
mapを使用した複数プロセスの実行
マルチスレッドの場合と同様に、各実行プロセスに渡す引数をまとめて指定して実行したい場合には、map
メソッドが使用できます。
import logging import time from concurrent.futures import ProcessPoolExecutor logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s") def myworker(x: int, y: int): logging.debug("start") result = x + y time.sleep(5) logging.debug(f"end: {result}") return result def main(): logging.debug("start") # max_workersでプロセス数を指定 with ProcessPoolExecutor(max_workers=5) as executor: args = [[2, 5], [3, 10]] result = executor.map(myworker, *args) # 返却値はイテレータ logging.debug(result) # 読みだしたタイミングで実行 logging.debug([r for r in result]) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainProcess: start MainProcess: <generator object _chain_from_iterable_of_lists at 0x000001F623B0B2A0> SpawnProcess-1: start SpawnProcess-2: start SpawnProcess-1: end: 5 SpawnProcess-2: end: 15 MainProcess: [5, 15] MainProcess: end
map
メソッドの使い方は、submit
メソッドに似ています。引数をリストのargs
として用意して、map
メソッドに渡すときには*args
といった形で渡しています。ここでのargs
は、Pythonのアンパック演算子を使用して、リスト内の複数のリストを個別の引数として関数に渡しています。
マルチスレッドの場合と使い方が同じであることが分かるかと思いますが、SpawnProcess-1、SpawnProcess-2ということで異なるプロセスで並列実行がされています。
このようにすることで簡単にマルチプロセスを実装することが可能です。
まとめ
Pythonでconcurrent.futures
モジュールを使った並列処理の方法について解説しました。
concurrent.futures
モジュールは、簡単な並列処理を、例えばmap
メソッドを使った方法などのシンプルなAPIで実装できる点がメリットです。一方で、複雑な並列処理を実装したい場合にはカスタマイズ性に欠けるというデメリットもあります。
複雑な並列処理が必要な場合や、より柔軟な実装が求められる時は、threading
モジュールやmultiprocessing
モジュールを使用することを検討してください。これらのモジュールの使い方については以下にまとめていますので興味があれば参考にしてください。
concurrent.futures
モジュールは、手軽に並列処理を実装する手段として導入を検討する価値があると思いますので、是非使い方を覚えてもらえればと思います。
concurrent.futuresの公式ドキュメントはこちらを参照してください。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。