concurrent.futures

【Python】concurrent.futuresによる並列処理の基本

【Python】concurrent.futuresによる並列処理の基本

Pythonでconcurrent.futuresモジュールを使った並列処理の方法について解説します。

Pythonによる並列処理

Pythonによるマルチスレッド/マルチプロセス

プログラミングにおいて並列処理をする場合には、マルチスレッドやマルチプロセスといった方式を検討することができます。

Pythonでは、マルチスレッドのためのthreadingモジュールやマルチプロセスのためのmultiprocessingモジュールが提供されています。これらのモジュールの使い方については以下にまとめていますので興味があれば参考にしてください。

ただし、これらの並列処理に関するモジュールはスレッドやプロセスについてよく理解して使用する必要があるため少し扱いが難しく、簡単な並列処理を手軽に実装したい場合には少しハードルが高くなります。

そこで役に立つのが本記事の対象であるPythonの標準ライブラリに組み込まれているconcurrent.futuresモジュールです。

concurrent.futuresモジュール

concurrent.futuresモジュールは、Pythonの標準ライブラリの一部として提供されているモジュールで、マルチスレッド処理やマルチプロセスを簡単に実装するためのAPIを提供してくれています。

concurrent.futuresモジュールは、マルチスレッドのためのThreadPoolExecutorとマルチプロセスのためのProcessPoolExecutorという2つの実行クラスを中心に提供してくれています。ThreadPoolExecutorは、主にI/Oバウンドなタスクの並列化に適しており、ProcessPoolExecutorは、CPUバウンドなタスクの並列化に適しています。それぞれの使用方法の基本は後述します。

concurrent.futuresモジュールは、簡単な並列処理を、例えばmapメソッドを使った方法などのシンプルなAPIで実装できる点がメリットです。一方で、複雑な並列処理を実装したい場合にはカスタマイズ性に欠けるというデメリットもあります。複雑な並列処理が必要な場合は、上記で紹介したようなthreadingモジュールやmultiprocessingモジュールを使用することでより柔軟な実装ができます。

本記事の以降では、concurrent.futuresモジュールで中心的なThreadPoolExecutorProcessPoolExecutorの基本的な使い方を紹介します。

並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。

concurrent.futuresの使い方

ThreadPoolExecutorを用いたマルチスレッド処理

基本的な使い方

ThreadPoolExecutorを用いたマルチスレッド処理の基本的な使い方を以下例を用いて紹介します。

import logging
import time
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")

    result = x + y
    time.sleep(5)

    logging.debug(f"end: {result}")
    return result


def main():
    logging.debug("start")

    # max_workersでスレッド数を指定
    with ThreadPoolExecutor(max_workers=5) as executor:
        f1 = executor.submit(myworker, 2, 3)
        f2 = executor.submit(myworker, 5, 10)
        logging.debug(f1.result())
        logging.debug(f2.result())

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
ThreadPoolExecutor-0_0: start
ThreadPoolExecutor-0_1: start
ThreadPoolExecutor-0_1: end: 15
ThreadPoolExecutor-0_0: end: 5
MainThread: 5
MainThread: 15
MainThread: end

上記のプログラムでは、myworker関数というxyという引数を受け取り、その合計を計算する処理をマルチスレッドで実行しています。マルチスレッド処理のためのスレッドプールを使うためにThreadPoolExecutorを、concurrent.futuresからインポートして使用します。

まず、ThreadPoolExecutorのwith句を使用してスレッドプールを作成します。この時に、max_workers引数で同時に実行できる最大スレッド数を指定します。今回の例では、最大で5つのスレッドが実行できる設定になっています。

スレッドを実行するためにはsubmitメソッドを使用します。executor.submitで実行する関数と当該関数に渡す引数を指定します。上記の例では2つのmyworker関数がマルチスレッドで並列動作します。

各スレッドでの実行結果は、f1.result()f2.result()のようにresultメソッドにより取得できます。このコードの部分で処理が完了いていない場合は、完了するまでブロックされます。

今回、最大スレッド数を指定する引数のmax_workers=5としているので、2つのスレッドが並列で実行されますが、例えば、max_workers=1と変更して実行するとスレッドは1つのタスクが終了した後に次のタスクが実行されます。

なお、出力には以下の部分で定義しているようにloggingを使用しています。loggingformatthreadNameを指定すると分かりやすく表示できます。

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

loggingについては「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。

mapを使用した複数スレッドの実行

上記で紹介した例では、各スレッド用にそれぞれでsubmitを実行しました。同じ関数を引数違いで並列実行したい場合には、以下のようにmapメソッドを使用することでまとめて実行できます。

import concurrent.futures
import logging
import time

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")

    result = x + y
    time.sleep(5)

    logging.debug(f"end: {result}")
    return result


def main():
    logging.debug("start")

    # max_workersでスレッド数を指定
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        args = [[2, 5], [3, 10]]
        result = executor.map(myworker, *args)

        # 返却値はイテレータ
        logging.debug(result)
        # 読みだしたタイミングで実行
        logging.debug([r for r in result])

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
ThreadPoolExecutor-0_0: start
ThreadPoolExecutor-0_1: start
MainThread: <generator object Executor.map.<locals>.result_iterator at 0x000001F348A9CA40>
ThreadPoolExecutor-0_0: end: 5
ThreadPoolExecutor-0_1: end: 15
MainThread: [5, 15]
MainThread: end

mapメソッドの使い方は、submitメソッドに似ています。引数をリストのargsとして用意して、mapメソッドに渡すときには*argsといった形で渡しています。ここでのargsは、Pythonのアンパック演算子を使用して、リスト内の複数のリストを個別の引数として関数に渡しています。

args=[[2, 5], [3, 10]]は、xのリスト[2, 5]yのリスト[3, 10]を持ったリストです。mapメソッドに渡すときに*argsとしているのは、executor.map(myworker, [2, 5], [3, 10])と同じ意味になります。

同じ関数を引数違いで実行する場合には、上記のようにmapを使用すると便利です。

Pythonのマルチスレッドでは、グローバルインタープリタロック(Global Interpreter Lock: GIL)の存在により、一度に1つのスレッドしか実行されない制限があるため、CPUバウンドの処理でのスピードアップを期待することが難しいくなります。なお、この制限はPythonのCPython実装に固有のものです。このような処理ではマルチプロセスの使用を検討すると良いでしょう。

ProcessPoolExecutorを用いたマルチプロセス

基本的な使い方

ProcessPoolExecutorを用いたマルチプロセスの基本的な使い方を以下例を用いて紹介します。

import logging
import time
from concurrent.futures import ProcessPoolExecutor

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")

    result = x + y
    time.sleep(5)

    logging.debug(f"end: {result}")
    return result


def main():
    logging.debug("start")

    # max_workersでプロセス数を指定
    with ProcessPoolExecutor(max_workers=5) as executor:
        f1 = executor.submit(myworker, 2, 3)
        f2 = executor.submit(myworker, 5, 10)
        logging.debug(f1.result())
        logging.debug(f2.result())

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
SpawnProcess-1: start
SpawnProcess-2: start
SpawnProcess-1: end: 5
MainProcess: 5
SpawnProcess-2: end: 15
MainProcess: 15
MainProcess: end

上記例を見ていただけると分かりますが、ThreadPoolExecutorと使い方は同じで、ProcessPoolExecutorwith句で各プロセスで動作させる関数と引数を指定してsubmitします。結果としては、SpawnProcess-1, SpawnProcess-2ということで、異なるプロセスで実行されていることが分かるかと思います。

異なるのは、ProcessPoolExecutorを使用すると、各タスクが異なるプロセス上で実行され、これによりPythonのGILの制約を受けずに真の並列実行が可能になる点です。

なお、logging設定をprocessNameとしてプロセス名が表示されるように変更していますので注意してください。

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

このように、マルチスレッドの場合と同様の書き方でマルチプロセスの実装が可能です。

mapを使用した複数プロセスの実行

マルチスレッドの場合と同様に、各実行プロセスに渡す引数をまとめて指定して実行したい場合には、mapメソッドが使用できます。

import logging
import time
from concurrent.futures import ProcessPoolExecutor

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")

    result = x + y
    time.sleep(5)

    logging.debug(f"end: {result}")
    return result


def main():
    logging.debug("start")

    # max_workersでプロセス数を指定
    with ProcessPoolExecutor(max_workers=5) as executor:
        args = [[2, 5], [3, 10]]
        result = executor.map(myworker, *args)

        # 返却値はイテレータ
        logging.debug(result)
        # 読みだしたタイミングで実行
        logging.debug([r for r in result])

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: <generator object _chain_from_iterable_of_lists at 0x000001F623B0B2A0>
SpawnProcess-1: start
SpawnProcess-2: start
SpawnProcess-1: end: 5
SpawnProcess-2: end: 15
MainProcess: [5, 15]
MainProcess: end

mapメソッドの使い方は、submitメソッドに似ています。引数をリストのargsとして用意して、mapメソッドに渡すときには*argsといった形で渡しています。ここでのargsは、Pythonのアンパック演算子を使用して、リスト内の複数のリストを個別の引数として関数に渡しています。

マルチスレッドの場合と使い方が同じであることが分かるかと思いますが、SpawnProcess-1、SpawnProcess-2ということで異なるプロセスで並列実行がされています。

このようにすることで簡単にマルチプロセスを実装することが可能です。

まとめ

Pythonでconcurrent.futuresモジュールを使った並列処理の方法について解説しました。

concurrent.futuresモジュールは、簡単な並列処理を、例えばmapメソッドを使った方法などのシンプルなAPIで実装できる点がメリットです。一方で、複雑な並列処理を実装したい場合にはカスタマイズ性に欠けるというデメリットもあります。

複雑な並列処理が必要な場合や、より柔軟な実装が求められる時は、threadingモジュールやmultiprocessingモジュールを使用することを検討してください。これらのモジュールの使い方については以下にまとめていますので興味があれば参考にしてください。

concurrent.futuresモジュールは、手軽に並列処理を実装する手段として導入を検討する価値があると思いますので、是非使い方を覚えてもらえればと思います。

Note

concurrent.futuresの公式ドキュメントはこちらを参照してください。