multiprocessing

【Python】multiprocessingによるマルチプロセスの基本

【Python】multiprocessingによるマルチプロセスの基本

Pythonでマルチプロセスを実装するためのmultiprocessingモジュールの使い方について解説します。

Pythonによるマルチプロセス

マルチプロセスとは、複数の独立したプロセスを用いてタスクを実行する方法のことを言います。Pythonでは、マルチプロセスの実装のためのmultiprocessingモジュールが提供されています。

本記事では、multiprocessingモジュールを用いたマルチプロセスの実装の基本について紹介します。

具体的なモジュールの使い方の説明の前に、まずはPythonにおけるマルチプロセスとマルチスレッドの違いについて簡単に触れておこうと思います。

Pythonにおけるマルチプロセスとマルチスレッド

Pythonには、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているのはC言語をベースにしたCPythonです。

CPythonにおいては、マルチスレッドを使用することは可能ですが、グローバルインタープリタロック(GIL)の存在により、CPUバウンドなタスクにおいてマルチスレッドの利点を最大限に活用できないという制約があります。GILは全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。

GILの存在により、Pythonオブジェクトへのアクセスは一度に一つのスレッドだけに限られます。そのため、PythonでのCPUバウンドな処理では、マルチスレッドを利用しても真の並列実行が難しく、スレッドがシリアル化されて順次実行されるため、高速化されることはほとんどありません。

マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。

また、マルチプロセスでは、各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作することが特徴です。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有するため、データの安全性に関する注意が必要です。

しかし、マルチプロセスを利用する際の課題として、プロセス間のデータの交換が必要になることが挙げられます。Pythonにはこのようなプロセス間の通信を助ける強力なツールが提供されており、以降でも紹介していきます。

なお、Pythonでマルチスレッドを実現するthreadingモジュールの使い方については「threadingによるマルチスレッド処理の基本」を参考にしてください。

並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。

multiprocessingの使い方

基本的な使い方 Process

multiprocessingモジュールを使って、複数のプロセスを生成し、処理を実行する基本的な使い方を以下の例で見ていきましょう。threadingモジュールを使ったことがある人はほとんど同じであることが分かるかと思います。

import logging
import os
import time
from multiprocessing import Process

logging.basicConfig(
    level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s"
)


def myworker1():
    logging.debug(f"start (pid:{os.getpid()})")
    time.sleep(5)
    logging.debug(f"end (pid:{os.getpid()})")


def myworker2():
    logging.debug(f"start (pid:{os.getpid()})")
    time.sleep(5)
    logging.debug(f"end (pid:{os.getpid()})")


def main():
    logging.debug(f"start (pid:{os.getpid()})")

    # プロセスの生成
    process1 = Process(target=myworker1)
    process2 = Process(target=myworker2)

    # プロセスの開始
    process1.start()
    process2.start()

    # プロセスの終了を待機
    process1.join()
    process2.join()

    logging.debug(f"end (pid:{os.getpid()})")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess_MainThread: start (pid:27024)
Process-1_MainThread: start (pid:28944)
Process-2_MainThread: start (pid:18884)
Process-2_MainThread: end (pid:18884)
Process-1_MainThread: end (pid:28944)
MainProcess_MainThread: end (pid:27024)

上記例は、myworker1myworker2という関数を別のプロセスで並列処理するプログラムとなっています。実行してみていただくとProcess-1とProcess-2というプロセスが動作し、それぞれのメインスレッド(MainThread)で処理が動作していることが分かります。以下のイメージ図のような形です。

multiprocessing Process

os.getpidでProcess ID(PID)を取得し表示していますが、それぞれ異なっていることからも別プロセスであることが把握できます。

プロセスの生成は、multiprocessingモジュールのProcessを使います。以下のようにtarget引数に各プロセスで実行する関数を渡します。

    # プロセスの生成
    process1 = Process(target=myworker1)
    process2 = Process(target=myworker2)

各プロセスの終了を待機するのがjoinメソッドです。プロセスが終了したら、joinより下のコードが実行されます。

    # プロセスの終了を待機
    process1.join()
    process2.join()

プロセスの出力では、以下の部分で定義しているようにloggingモジュールを使用しています。loggingformatprocessNamethreadNameを指定すると分かりやすく表示できます。

logging.basicConfig(
    level=logging.DEBUG, format="%(processName)s_%(threadName)s: %(message)s"
)

今回は、それぞれでプロセスでMainThreadが動いていることを示すためにthreadNameを表示しましたが、以降の例では煩雑になるのでprocessNameのみ表示にします。

なお、loggingについては「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。

上記のようにProcessを使うことで簡単にマルチプロセスの実装をすることができます。

プロセス名称をつけたい場合

上記例では、プロセス名称(processName)が「Process-1」「Process-2」というようになっていました。プロセスに個別に名称をつけたい場合には、Process生成時に以下のようにname引数で指定します。(変更のない部分は省略します。)

...(省略)...

    # プロセスの生成
    process1 = Process(target=myworker1, name="myworker1_process")
    process2 = Process(target=myworker2, name="myworker2_process")

...(省略)...
【実行結果】
MainProcess: start
myworker1_process: start
myworker2_process: start
myworker1_process: end
myworker2_process: end
MainProcess: end

name引数で指定することで、loggingで出力しているprocessNameの部分は指定した名称に変わります。

プロセスで動作する関数への引数の渡し方

プロセスで動作する関数へ引数を渡したい場合には、Processの生成時に以下のようにargs引数やkwargs引数を使用します。(変更のない部分は省略します)

...(省略)...

def myworker1(x: int, y: int):
    logging.debug("start")
    logging.debug(f"x: {x}, y: {y}")
    time.sleep(5)
    logging.debug("end")

...(省略)...

    # プロセスの生成
    process1 = Process(target=myworker1, args=(10,), kwargs={"y": 20})

...(省略)...
【実行結果】
MainProcess: start
Process-1: start
Process-1: x: 10, y: 20
Process-2: start
Process-1: end
Process-2: end
MainProcess: end

上記例では、myworker1の方の関数に引数としてintx, yがあります。この関数に引数を渡す場合には、argsにタプルで位置引数で指定するか、kwargsに辞書でキーワード引数として指定することができます。argsはタプルなので、1つ引数を渡す場合は上記の(10,)のようにカンマ(,)がいるので注意しましょう。

実行結果を見てもわかるようにProcess-1の方では、メインプロセスから引数として渡した情報を表示できていることが分かります。

マルチスレッドとマルチプロセスのメモリの扱いの違い

冒頭でマルチプロセスは各プロセスが独立したメモリ空間を持つため、他のプロセスの影響を受けずに動作するということを説明しました。対照的に、マルチスレッドではすべてのスレッドが同一のメモリ空間を共有します。

このことを簡単なプログラムで確認してみたいと思います。

マルチプロセスの場合
import logging
from multiprocessing import Process

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


def myworker(data: dict):
    logging.debug("start")
    data["x"] += 1
    logging.debug(f"end: {data}, {id(data)}")


def main():
    logging.debug("start")

    # データを用意
    data = {"x": 0}

    # プロセスを生成
    process1 = Process(target=myworker, args=(data,))
    process2 = Process(target=myworker, args=(data,))

    # プロセスの開始
    process1.start()
    process2.start()

    # プロセスの終了を待機
    process1.join()
    process2.join()

    logging.debug(f"end: {data}, {id(data)}")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
Process-1: start
Process-2: start
Process-1: end: {'x': 1}, 2324205073536
Process-2: end: {'x': 1}, 1441800731136
MainProcess: end: {'x': 0}, 3244999688960

上記例では、dataという辞書を用意して、Process-1、Process-2に渡しています。この時、対象の辞書のidを確認していますが、マルチプロセスでは、MainProcess、Process-1、Process-2それぞれでidが異なっていることが分かります。

Process-1やProcess-2は、開始時点でMainProcessから子プロセスとして生成されます。このような処理をフォーク(fork)と言います。この時点でdataの辞書は{'x': 0}で各プロセスごとでコピーされるような形となるため、各プロセスでは1が加算されて{'x': 1}となっていますが、MainProcessでは{'x':0}のままになります。

このように、マルチプロセスでは各プロセスで独立したメモリ空間を扱います。

マルチスレッドの場合

以下は上記と全く同じコードを、マルチスレッドで実装した場合です。

import logging
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(data: dict):
    logging.debug("start")
    data["x"] += 1
    logging.debug(f"end: {data}, {id(data)}")


def main():
    logging.debug("start")

    # データを用意
    data = {"x": 0}

    # スレッドを生成
    thread1 = Thread(target=myworker, args=(data,))
    thread2 = Thread(target=myworker, args=(data,))

    # スレッドの開始
    thread1.start()
    thread2.start()

    # スレッドの終了を待機
    thread1.join()
    thread2.join()

    logging.debug(f"end: {data}, {id(data)}")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (myworker): start
Thread-1 (myworker): end: {'x': 1}, 3138390283776
Thread-2 (myworker): start
Thread-2 (myworker): end: {'x': 2}, 3138390283776
MainThread: end: {'x': 2}, 3138390283776

マルチスレッドでも同じように実装していますが、MainThread、Thread-1、Thread-2で対象の辞書のidは全く同じになっていることが分かります。このようにマルチスレッドでは各スレッドが同じメモリ領域を参照します。

実際に上記のプログラムはスレッドセーフではありません。特にdata["x"] +=1という部分は、複数のスレッドから同時にアクセスされると不正な結果を生じる可能性があります。安全にデータの更新を行うにはLock等を使用して排他制御を行うのが適切です。

上記で見たようにマルチプロセスとマルチスレッドのメモリの扱いの違いはよく理解しておくようにしましょう。

プロセスプール Pool

マルチプロセスは、各プロセスが独立したリソースを使用するため子プロセスを大量に生成してしまうことは避けるべきです。マルチプロセスを用いたアプリケーションでは、プロセスプールを構築してリソースの利用を制限するのが良い方法です。

multiprocessingモジュールでは、Poolクラスというプロセスプールのための枠組みを提供してくれています。これは、threadingモジュールには含まれておらず、この点はmultiprocessingの方が便利だと感じる点です。Poolクラスが、複数のプロセスの管理を簡単にしてくれます。

Poolは、以下のようにwith句で使用します。ここでは構成を説明のみとし、具体的な処理は省略しています。

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker():
    ...(マルチプロセスで動作させる処理を記載)...


def main():
    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        ...(処理を記載)...


if __name__ == "__main__":
    main()

上記例では、プロセスプールのサイズを3にしているので、プロセスは同時に3つまで起動します。(処理の記載)の部分では、使用できる便利なメソッドがいくつもあります。以降ではそれらのメソッドを使用したプロセスプールの実装例を紹介していきます。

プールでのプロセス実行 apply_async

Poolを使ったプロセスプールでプロセスを実行する場合には、以下のようにapply_asyncメソッドが使用できます。

import logging
import time
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(x: int):
    logging.debug(f"start")
    time.sleep(5)
    logging.debug("end")

    return x


def main():
    logging.debug("start")

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        process1 = pool.apply_async(func=myworker, args=(10,))
        process2 = pool.apply_async(func=myworker, args=(20,))
        process3 = pool.apply_async(func=myworker, args=(30,))
        process4 = pool.apply_async(func=myworker, args=(40,))
        process5 = pool.apply_async(func=myworker, args=(50,))

        logging.debug("execute async")

        # 実行結果を取得
        logging.debug(process1.get())
        logging.debug(process2.get())
        logging.debug(process3.get())
        logging.debug(process4.get())
        logging.debug(process5.get())

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: execute async
SpawnPoolWorker-1: start
SpawnPoolWorker-3: start
SpawnPoolWorker-2: start
SpawnPoolWorker-1: end
SpawnPoolWorker-3: end
SpawnPoolWorker-2: end
SpawnPoolWorker-1: start
SpawnPoolWorker-3: start
MainProcess: 10
MainProcess: 20
MainProcess: 30
SpawnPoolWorker-1: end
SpawnPoolWorker-3: end
MainProcess: 40
MainProcess: 50
MainProcess: end

apply_asyncの使い方は、Processと似ていますが、targetに相当する部分がfuncとなっています。asyncは非同期を意味するため、以降の”execute async”という出力が先にされていることが分かります。なお、関数の返却値は、getメソッドで取得できます。

今回プロセスプールのサイズを3としているので、一度に実行されるプロセスは3つまでです。上記結果例では、プロセス1と3が終わり次第、残りの2つプロセスが起動し後続処理が動作していることが分かります。

timeoutの設定

apply_asyncの実行では、以下のようにtimeoutを指定することができます。変更のない部分は省略しています。timeoutは、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutErrorとなります。

...(省略)...
 
       # 実行結果を取得
        logging.debug(process1.get(timeout=1))
        logging.debug(process2.get())
        logging.debug(process3.get())
        logging.debug(process4.get())
        logging.debug(process5.get())

...(省略)...
【実行結果】
MainProcess: start
MainProcess: execute async
SpawnPoolWorker-2: start
SpawnPoolWorker-3: start
SpawnPoolWorker-1: start
Traceback (most recent call last):
...(省略)...
multiprocessing.context.TimeoutError

上記例では、timeout=1としているので1秒以内に処理が返ってくる必要がありますが、myworkerの処理でtime.sleep(5)を入れているので処理は1秒以内には終わりません。この場合は、上記のようにTimeoutErrorとなります。

timeout引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。

プロセスのブロック apply

apply_asyncに似たメソッドとして、applyメソッドがあります。このメソッドは、終了するまで後続の処理をブロックします。

import logging
import time
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(x: int):
    logging.debug(f"start")
    time.sleep(5)
    logging.debug("end")

    return x


def main():
    logging.debug("start")

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        # プロセスでブロック
        process = pool.apply(func=myworker, args=(0,))
        logging.debug(process)

        # 上記を実行してから並列実行
        process1 = pool.apply_async(func=myworker, args=(10,))
        process2 = pool.apply_async(func=myworker, args=(20,))
        process3 = pool.apply_async(func=myworker, args=(30,))
        process4 = pool.apply_async(func=myworker, args=(40,))
        process5 = pool.apply_async(func=myworker, args=(50,))

        # 実行結果を取得
        logging.debug(process1.get())
        logging.debug(process2.get())
        logging.debug(process3.get())
        logging.debug(process4.get())
        logging.debug(process5.get())

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
SpawnPoolWorker-1: start
SpawnPoolWorker-1: end
MainProcess: 0
SpawnPoolWorker-2: start
SpawnPoolWorker-3: start
SpawnPoolWorker-1: start
SpawnPoolWorker-1: end
SpawnPoolWorker-3: end
SpawnPoolWorker-2: end
SpawnPoolWorker-1: start
SpawnPoolWorker-3: start
MainProcess: 10
MainProcess: 20
MainProcess: 30
SpawnPoolWorker-1: end
SpawnPoolWorker-3: end
MainProcess: 40
MainProcess: 50
MainProcess: end

applyの使い方は、apply_asyncと似ていますが、applyは同期的に動作し、指定した関数の終了を待ちます。上記実行結果を見ても分かるように、まずapplyで指定した1つのプロセスが実行されて、後続の処理はブロックされます。

このことから、並列処理という観点ではapply_asyncの方が適しています。先にある処理を実行してから後続処理を並列で実行するような、処理の順序付けをしたい場合には、applyを使うことでうまく制御できるかと思います。

イテラブルな引数の使用 map

apply_asyncapplyは一つの引数の組を渡してプロセスを実行しました。例えばリストで保持している引数を順に渡して処理をさせたいような場合には、for文で順次渡すといったことが必要です。しかし、Poolクラスでは、mapメソッドが用意されているので、このようなケースでも以下のようにシンプルに記載できます。

このmapメソッドは、組み込み関数のmapの並列版ともいえるものです。ただし、組み込み関数のmapは複数のイテラブルを指定できるのに対し、Poolmapメソッドはイテラブルな引数は一つのみサポートするという点で違いがあります。

import logging
import time
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(x: int):
    logging.debug(f"start")
    time.sleep(5)
    logging.debug("end")

    return x * 2


def main():
    logging.debug("start")

    values = [i for i in range(1, 6)]
    logging.debug(values)

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        # mapで各プロセスを実行 ↓でブロックされる
        result = pool.map(func=myworker, iterable=values)

        # 全てのプロセスが完了してから表示
        logging.debug("executed")

        # 実行結果を表示
        logging.debug(result)

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: [1, 2, 3, 4, 5]
SpawnPoolWorker-1: start
SpawnPoolWorker-3: start
SpawnPoolWorker-2: start
SpawnPoolWorker-1: end
SpawnPoolWorker-1: start
SpawnPoolWorker-2: end
SpawnPoolWorker-3: end
SpawnPoolWorker-2: start
SpawnPoolWorker-1: end
SpawnPoolWorker-2: end
MainProcess: executed
MainProcess: [2, 4, 6, 8, 10]
MainProcess: end

上記の例では、myworkerは受け取った値を2倍するようなものになっています。map関数は、apply_asyncapplyとは異なり、イテラブルな引数を受け取り、その各要素を関数に適用します。mapメソッドは、iterableで指定したイテラブルの各要素を順次渡して処理をし、処理結果を返却します。

mapメソッドでは、mapを記載した行で処理がブロックされるため、後続の”executed”という文字は全てのプロセスが完了してから表示されます。

イテラブルな引数の使用(非同期版) map_async

上記例で見たようにmapは後続の処理をブロックするような動作をしました。mapの非同期版ともいえるのが、map_asyncメソッドです。

import logging
import time
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(x: int):
    logging.debug(f"start")
    time.sleep(5)
    logging.debug("end")

    return x * 2


def main():
    logging.debug("start")

    values = [i for i in range(1, 6)]
    logging.debug(values)

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        # map_asyncで非同期に実行
        result = pool.map_async(func=myworker, iterable=values)

        # 非同期なのですぐ表示される
        logging.debug("execute async")

        # 実行結果を表示
        logging.debug(result.get())

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: [1, 2, 3, 4, 5]
MainProcess: execute async
SpawnPoolWorker-1: start
SpawnPoolWorker-2: start
SpawnPoolWorker-3: start
SpawnPoolWorker-1: end
SpawnPoolWorker-2: end
SpawnPoolWorker-3: end
SpawnPoolWorker-1: start
SpawnPoolWorker-2: start
SpawnPoolWorker-2: end
SpawnPoolWorker-1: end
MainProcess: [2, 4, 6, 8, 10]
MainProcess: end

map_asyncの使い方は、mapと同じですが、map_asyncは非同期であるため”execute async”という文字列ががすぐに表示されていることが分かります。

timeoutの設定

map_asyncの実行では、以下のようにtimeoutを指定することができます。変更のない部分は省略しています。timeoutは、秒単位で指定し、指定した秒数以内で処理が返ってこない場合は、TimeoutErrorとなります。

...(省略)...

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        # map_asyncで非同期に実行
        result = pool.map_async(func=myworker, iterable=values)

        # 非同期なのですぐ表示される
        logging.debug("execute async")

        # 実行結果を表示
        logging.debug(result.get(timeout=1))

...(省略)...
【実行結果】
MainProcess: start
MainProcess: [1, 2, 3, 4, 5]
MainProcess: execute async
SpawnPoolWorker-1: start
SpawnPoolWorker-2: start
SpawnPoolWorker-3: start
Traceback (most recent call last):
...(省略)...
multiprocessing.context.TimeoutError

上記例では、timeout=1としているので1秒以内に処理が返ってくる必要がありますが、処理でtime.sleep(5)を入れているので処理は1秒以内には終わりません。この場合は、上記のようにTimeoutErrorとなります。

timeout引数を使うことでタイムアウトを設定しておき、必要に応じて例外処理をするといったことが可能です。

イテラブルな引数の使用(遅延評価版) imap

mapmap_asyncと紹介してきましたが、imapという遅延評価版のメソッドもあります。遅延評価とは、必要になるタイミングまで値の評価がされないことを意味します。

import logging
import time
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(x: int):
    logging.debug(f"start")
    time.sleep(5)
    logging.debug("end")

    return x * 2


def main():
    logging.debug("start")

    values = [i for i in range(1, 6)]
    logging.debug(values)

    # プロセスプールの生成
    with Pool(PROCESS_POOL_SIZE) as pool:
        # imapでイテレータを生成
        process_iter = pool.imap(func=myworker, iterable=values)
        # ここではイテレータが取得されるだけ
        logging.debug(process_iter)

        logging.debug("execute")

        # 使用するときにイテレータから読みだして実行 (遅延評価)
        for i in process_iter:
            logging.debug(i)

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: [1, 2, 3, 4, 5]
MainProcess: <multiprocessing.pool.IMapIterator object at 0x000001D3AF5D4110>
MainProcess: execute
SpawnPoolWorker-1: start
SpawnPoolWorker-2: start
SpawnPoolWorker-3: start
SpawnPoolWorker-2: end
SpawnPoolWorker-1: end
SpawnPoolWorker-1: start
SpawnPoolWorker-2: start
MainProcess: 2
MainProcess: 4
SpawnPoolWorker-3: end
MainProcess: 6
SpawnPoolWorker-2: end
SpawnPoolWorker-1: end
MainProcess: 8
MainProcess: 10
MainProcess: end

imapは、mapmap_asyncと使用方法は同じですが、返却値がイテレータとなっています。返却値を表示してみていますが「multiprocessing.pool.IMapIterator object」となっていることからもイテレータであることが分かります。

その後、”execute”という表示をしていますが、このタイミングではプロセスの実行はされていません。具体的にfor文でイテレータから取り出しているタイミングで各プロセスで処理が実行されます。必要なタイミングで処理が動いていることから遅延評価と言います。

プロセス間のデータ交換方法

マルチプロセスでは、複数のプロセスが独立して動作するため、プロセス間のデータ交換の仕組みが必要です。以降では、multiprocessingモジュールでのプロセス間のデータ交換で使用できるQueuePipeについて紹介します。

Queue

キューを用いたプロセス間のデータ制御をするためにmultiprocessingモジュールでは、Queueクラスが用意されています。これはqueueモジュールのQueueと似ていますが、完全に同じように使用できるわけではありません。実際、multiprocessing.Queueはプロセス間の通信に特化しており、queue.Queueはスレッド間の通信を目的としています。また、例外についてはqueueモジュールのqueue.Emptyqueue.Fullといった例外が送出されます。

import logging
import time
from multiprocessing import Process, Queue
from queue import Empty

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")

# プロセスプールのサイズ
PROCESS_POOL_SIZE = 3


def myworker(work_queue):
    logging.debug("start")

    # キューが空でない限り繰り返す
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            time.sleep(1)
            logging.debug(item)

    logging.debug("end")


def main():
    logging.debug("start")

    # プロセスで共有するキューを用意する
    work_queue = Queue()

    # 表示したい値のリスト
    vals = [i for i in range(10)]
    # キューに投入する
    for val in vals:
        work_queue.put(val)

    # プロセスプール数のプロセスを用意
    processes = [
        Process(target=myworker, args=(work_queue,))
        for _ in range(PROCESS_POOL_SIZE)
    ]

    # プロセスの開始
    for process in processes:
        process.start()

    # 終了を待機
    for process in processes:
        process.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
Process-1: start
Process-3: start
Process-2: start
Process-2: 2
Process-3: 1
Process-1: 0
Process-2: 3
Process-3: 4
Process-1: 5
Process-1: 8
Process-3: 7
Process-2: 6
Process-3: end
Process-2: end
Process-1: 9
Process-1: end
MainProcess: end

上記例は、Poolクラスでのプロセスプールの例をQueueを使って実装したようなものになっています。処理の構成については、threadingにおけるでキューの使用を説明している「キューを用いたスレッド制御」と同じのため、詳細はそちらを参照してください。

ただし、参照先ページのqueue.Queueの例では、task_donejoinといったメソッドがありますが、multiprocessingQueueには、task_donejoinといったメソッドがないので注意してください。

Pipe

各プロセス間でソケットのような双方向の通信チャンネルを作成したい場合には、Pipeを使用します。Pipeは以下のように使用します。

import logging
from multiprocessing import Pipe, Process

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


class CustomClass:
    pass


def myworker(conn):
    logging.debug("child: start")
    
    # 送信データの用意
    data = [1, "string", {"dict": 0}, CustomClass()]
    logging.debug(f"child: {data}")

    # 親プロセスへデータを送信
    conn.send(data)
    # コネクションをクローズ
    conn.close()

    logging.debug("child: end")


def main():
    logging.debug("parent: start")

    # 親コネクションと子コネクションを生成
    # 引数指定しない場合は、デフォルトはPipe(duplex=True)で双方向になっている
    # Pipe(duplex=False)にするとparrent_conn:受信専用、child_conn:送信専用になる
    parent_conn, child_conn = Pipe()

    child_process = Process(target=myworker, args=(child_conn,))

    # 子プロセスの開始
    child_process.start()

    # 子プロセスからのデータを受信
    data = parent_conn.recv()
    logging.debug(f"parent: {data}")

    # プロセスの終了を待機
    child_process.join()

    logging.debug("parent: end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: parent: start
Process-1: child: start
Process-1: child: [1, 'string', {'dict': 0}, <__mp_main__.CustomClass object at 0x000001A0762E9750>]
Process-1: child: end
MainProcess: parent: [1, 'string', {'dict': 0}, <__main__.CustomClass object at 0x00000225FF32EE90>]
MainProcess: parent: end

Pipeを使用する際には「parent_conn, child_conn = Pipe()」といったように生成します。parent_connは親コネクションで、child_connは子コネクションを表しています。

子プロセスを生成する際に、child_connを引数として渡すことで親プロセスとの通信ができるようになります。

def myworker(conn):
    logging.debug("child: start")
    
    # 送信データの用意
    data = [1, "string", {"dict": 0}, CustomClass()]
    logging.debug(f"child: {data}")

    # 親プロセスへデータを送信
    conn.send(data)
    # コネクションをクローズ
    conn.close()

    logging.debug("child: end")

子プロセスでは上記のように受け取ったコネクションを使用して、sendメソッドで親プロセスにデータを送信できます。コネクションは不要になったらcloseするようにしましょう。

    # 子プロセスからのデータを受信
    data = parent_conn.recv()
    logging.debug(f"parent: {data}")

親プロセス側では、上記の部分で子プロセスからのデータを受信します。データの受信では、親コネクションのrecvメソッドを使用します。これにより子プロセスからのデータを受信できます。

このようにPipeを使うことで親プロセスと子プロセス間でデータのやり取りができます。なお、Pipe生成時に引数を指定しないデフォルトの場合、デフォルトはPipe(duplex=True)で双方向通信できるようになっています。そのため、parent_connの方からsendメソッドを使うこともできます。

しかし、Pipe(duplex=False)にするとparrent_connは受信専用、child_connは送信専用となります。通信方向を制限したい場合には使用を検討するとよいでしょう。もし、duplex=Falseにした状態で、parent_conn.sendを実行しようとすると以下のように例外となります。

Traceback (most recent call last):
...(省略)...
OSError: connection is read-only

なお、上記の例ではカスタムクラスのオブジェクトをやり取りするようになっていますが、この場合、オブジェクトがpickle可能であることが必要です。オブジェクトがpickle可能であるというのは、pickleモジュールを使用してシリアル化とデシリアル化が可能であることを意味し、オブジェクトにより可否が分かれるため注意しましょう。

プロセス間におけるデータの共有

マルチプロセス間でデータをやり取りするQueuePipeについて見てきました。プロセス間でデータをやり取りする方法としてもう一つプロセス間で共有される専用メモリを使う方法があります。以降では共有メモリを使うための仕組みをいくつか紹介します。

マルチプロセスでは、プロセス生成時のオーバーヘッドがかかったり、データを交換する手間が発生したりしますが、メモリ空間を共有しないことは誤ってメモリのデータを破壊してしまう危険性が減るため利点と言えます。

そのため、基本的にはマルチプロセスでは、以降で紹介するデータ共有の仕組みは使わない方がいいと思います。どうしてもデータを共有しなければならないケースなどでは十分注意して使うようにしましょう。

Value, Array

プロセス間でデータを共有する簡単な方法としては、ValueArrayを使う方法があります。これらのオブジェクトは共有ctypesオブジェクト(shared ctypes)と言い、プロセス間で共有されるオブジェクトになります。

import logging
from multiprocessing import Array, Process, Value

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


def myworker(num, arr):
    logging.debug("start")
    logging.debug(num)
    logging.debug(arr)

    # 値を変更
    num.value += 1.0
    for i in range(len(arr)):
        arr[i] *= 2

    # 値の表示
    logging.debug(num.value)
    logging.debug(arr[:])

    logging.debug("end")


def main():
    logging.debug("start")

    # "d":double(倍精度浮動小数) "i":sined int(符号付き整数) 他は以下を参照
    # https://docs.python.org/ja/3/library/array.html#module-array
    num = Value("d", 0.0)
    arr = Array("i", range(5))
    # 値の表示
    logging.debug(num.value)
    logging.debug(arr[:])

    # プロセスを生成
    p = Process(target=myworker, args=(num, arr))

    # プロセスの開始
    p.start()

    # プロセスの終了を待機
    p.join()

    # 値の表示
    logging.debug(num.value)
    logging.debug(arr[:])

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: 0.0
MainProcess: [0, 1, 2, 3, 4]
Process-1: start
Process-1: <Synchronized wrapper for c_double(0.0)>
Process-1: <SynchronizedArray wrapper for <multiprocessing.sharedctypes.c_long_Array_5 object at 0x0000017E77500DD0>>
Process-1: 1.0
Process-1: [0, 2, 4, 6, 8]
Process-1: end
MainProcess: 1.0
MainProcess: [0, 2, 4, 6, 8]
MainProcess: end

ValueArrayの使い方は簡単で、以下のように生成して使います。

    num = Value("d", 0.0)
    arr = Array("i", range(5))
    # 値の表示
    logging.debug(num.value)
    logging.debug(arr[:])

最初の引数に指定している”d”"i"は、"d"がdouble(倍精度浮動小数)、"i"がsigned int(符号付き整数)といった型コードを表しています。型コードの指定はこちらのページを参考にしてください。

子プロセス側で変更を加えているのは以下の部分です。

    # 値を変更
    num.value += 1.0
    for i in range(len(arr)):
        arr[i] *= 2

Valueの値の変更ではnum.valueに対して操作します。また、Arrayの方はfor文で上記のように各要素にアクセスする必要があります。

上記結果を見てもわかるようにValueArrayは共有されているため、子プロセスで変更を加えた内容を親プロセス側でも直接確認できていることが分かります。

Manager

ValueArrayといったデータ共有の仕組みを紹介しましたが、少しPythonらしくなく使いにくい印象を受けたかと思います。

multiprocessingモジュールには、共有データを扱いやすくするためのManagerというものがあります。Managerはサーバープロセスを管理するためのものとなっています。Managerの使い方を以下の例で見てみましょう。

import logging
from multiprocessing import Manager, Process

logging.basicConfig(level=logging.DEBUG, format="%(processName)s: %(message)s")


def myworker(tmp_l, tmp_d, tmp_n):
    logging.debug("start")
    tmp_l.reverse()
    tmp_d["x"] += 1
    tmp_n.y *= 2

    logging.debug(tmp_l)
    logging.debug(tmp_d)
    logging.debug(tmp_n)

    logging.debug("end")


def main():
    logging.debug("start")

    with Manager() as manager:
        tmp_l = manager.list()
        tmp_d = manager.dict()
        tmp_n = manager.Namespace()

        # 値を設定
        tmp_l.append(1)
        tmp_l.append(2)
        tmp_l.append(3)
        tmp_d["x"] = 0
        tmp_n.y = 1

        # 実行前の値
        logging.debug(tmp_l)
        logging.debug(tmp_d)
        logging.debug(tmp_n)

        p = Process(target=myworker, args=(tmp_l, tmp_d, tmp_n))
        p.start()
        p.join()

        # 実行後の値
        logging.debug(tmp_l)
        logging.debug(tmp_d)
        logging.debug(tmp_n)

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainProcess: start
MainProcess: [1, 2, 3]
MainProcess: {'x': 0}
MainProcess: Namespace(y=1)
Process-2: start
Process-2: [3, 2, 1]
Process-2: {'x': 1}
Process-2: Namespace(y=2)
Process-2: end
MainProcess: [3, 2, 1]
MainProcess: {'x': 1}
MainProcess: Namespace(y=2)
MainProcess: end

Managerは、以下の部分のようにwith句を使って使用します。

    with Manager() as manager:
        tmp_l = manager.list()
        tmp_d = manager.dict()
        tmp_n = manager.Namespace()

        # 値を設定
        tmp_l.append(1)
        tmp_l.append(2)
        tmp_l.append(3)
        tmp_d["x"] = 0
        tmp_n.y = 1

上記例では、tmp_lがリスト、tmp_dが辞書で通常のリストや辞書のようにデータの操作ができます。tmp_nはNamespaceというもので「.(ドット)」を使ってクラスのプロパティのように値の操作ができます。

あとはプロセスの引数としてこれらの変数を渡してあげることで、プロセス間でデータ共有が簡単にできます。

ValueArrayに比べてPythonのオブジェクトのように扱うことができるため便利です。ただし、注意点としてValueArrayに比べて処理が遅いという特徴がありますので、使いどころをについてはよく検討してもらえると良いかと思います。

異なるサーバー間でのデータ共有

Managerは、ネットワーク越しで異なるサーバー間でデータ共有することも可能です。以下の例で見てみましょう。

server.py

import queue
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


def main():
    q = queue.Queue()

    # マネージャーで呼び出し可能なオブジェクト登録
    QueueManager.register("get_queue", callable=lambda: q)

    # マネージャーを取得
    # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス
    # authkey: サーバープロセスへのコネクションを検証するための認証キー
    manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")

    # サーバーを取得
    server = manager.get_server()
    # サーバーを起動する
    server.serve_forever()


if __name__ == "__main__":
    main()

client1.py(データをキューに登録)

from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


def main():
    # オブジェクト登録
    QueueManager.register("get_queue")

    # マネージャーを取得
    manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")

    # サーバーへ接続
    manager.connect()
    queue = manager.get_queue()
    queue.put("hello")


if __name__ == "__main__":
    main()

client2.py(データをキューから取り出す)

from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


def main():
    # オブジェクト登録
    QueueManager.register("get_queue")

    # マネージャーを取得
    manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")

    # サーバーへ接続
    manager.connect()
    queue = manager.get_queue()
    print(queue.get())


if __name__ == "__main__":
    main()
【実行結果】()内は手順の補足
(1. サーバーの立ち上げ)
> python server.py

(2. 別のターミナルを立ち上げて実行: "hello"がキューにputされる)
> python client1.py

(3. 別のターミナルを立ち上げて実行: "hello"がキューからgetされて表示される。キューに値がない場合は待ち状態になり、client1でputされたら表示される)
> python .\multi_client2.py 
hello

上記例では、3つのプログラムを作っています。server.pyがサーバーで動作し、client1.pyがデータをキューに登録し、client2.pyがキューからデータを取り出すクライアントとして動いているようなイメージを持ってもらえればと思います。

server.pyの内容

まずは、server.pyの内容から見ていきましょう。

from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass

multiprocessing.managersからBaseManagerをインポートし、BaseManagerを継承したQueueManagerというクラスを作成しています。

def main():
    q = queue.Queue()

    # マネージャーで呼び出し可能なオブジェクト登録
    QueueManager.register("get_queue", callable=lambda: q)

    # マネージャーを取得
    # address: マネージャープロセスが新たなコネクションを待ち受けるアドレス
    # authkey: サーバープロセスへのコネクションを検証するための認証キー
    manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")

    # サーバーを取得
    server = manager.get_server()
    # サーバーを起動する
    server.serve_forever()

メイン関数では、queueモジュールからQueueを生成し、registerメソッドで、呼び出し可能なものとして登録をしています。これにより、manager.get_queue()でキューを取り出すことができます。

    manager = QueueManager(address=("localhost", 50000), authkey=b"P@ssw0rd")

QueueManagerの生成時には、addressを使ってコネクションを待ち受けるアドレスとポートを指定し、authkeyで認証キーを設定します。これらの値をclient側で用いることでアクセスできるようになります。今回は、自端末で確認できるようにlocalhostとしていますが、実際にはサーバーのアドレスを入力します。

    # サーバーを取得
    server = manager.get_server()
    # サーバーを起動する
    server.serve_forever()

あとは、サーバーを取得、起動します。serve_foreverを実行することでサーバー機能が常時動く状態となります。

client1.pyの内容

次にデータをキューに登録するclient1.pyの処理を見てみます。作りはserver.pyに非常に似ており、クラス定義やオブジェクト登録、マネージャーの取得の部分は同じです。

    # サーバーへ接続
    manager.connect()
    queue = manager.get_queue()
    queue.put("hello")

サーバーへ接続しているところが上記の部分ですが、connectを使うことでサーバーに接続します。get_queueによりキューを取得できるので、キューに”hello”という文字列をputで登録しています。

client2.pyの内容

最後に、キューから値を取り出すclient2.pyの処理ですが、client1.pyとほとんど同じです。

    # サーバーへ接続
    manager.connect()
    queue = manager.get_queue()
    print(queue.get())

キューの操作では、putではなくgetで値を取得している点がclient1と異なります。

処理の実行手順

動作の確認の手順についてです。まずは、server1.pyを実行します。serve_foreverでサーバーが起動状態となります。

次に別のプロンプトを開いてclient1.pyを実行します。これによりキューに”hello”という文字列が登録されます。

最後にまた別のターミナルを開いてclient2.pyを実行します。すると、client1.pyがキューに登録した”hello”という文字列が表示されます。

複数回client2.pyを実行すると待ち状態になります。これは、キューに値が入らないので待っている状態です。その状態で、client1.pyを実行すると、client2.py側ですぐに”hello”と表示されます。

今回の実行手順は自端末(localhost)での確認方法で紹介しましたが、それぞれサーバー、クライアントで実行することで異なるサーバー間でもデータ共有が可能です。

まとめ

Pythonでマルチプロセスを実装するためのmultiprocessingモジュールの使い方について解説しました。

並列処理を実装する場合には、マルチスレッドとマルチプロセスという選択肢が考えられます。Pythonにおいてはグローバルインタープリタロック(GIL)の影響でマルチスレッドではCPUバウンドな処理では十分な効果を発揮できない可能性があります。

マルチプロセスの場合、各プロセスがGILの影響を受けない独自の実行環境を持つため、リソースをより効果的に活用できます。Pythonを使用してマルチコアCPU上でのCPU負荷が高いタスクを実行する場合、マルチプロセスの利用は非常に妥当であると言えます。

multiprocessingモジュールでは、マルチプロセスを実行するためにプロセスプール(Pool)やデータ交換方法(QueuePipe)・データ共有方法(ValueArrayManager)といった便利な機能を提供してくれます。

並列処理は難しいですが、ぜひmultiprocessingモジュールの使い方を覚えてもらえるといいかと思います。

Note

multiprocessingの公式ドキュメントはこちらを参照してください。