threading

【Python】threadingによるマルチスレッド処理の基本

【Python】threadingによるマルチスレッド処理の基本

Pythonでマルチスレッド処理を行うためのthreadingモジュールの使い方について解説します。

Pythonによるマルチスレッド処理 threading

マルチスレッド処理とは、同一のメモリ空間内で、複数のスレッドで並列処理することを言います。Pythonには、マルチスレッド処理のためのthreadingモジュールが提供されています。

Pythonの実装には、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているPythonの実装は、C言語をベースにしたCPythonです。

CPythonの実装には、マルチスレッドを多くの状況で効果的に使用できないという大きな制限が存在します。これは、グローバルインタープリタロック(GIL)として知られています。GILは、全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。

GILの存在下で、Pythonオブジェクトにアクセスする際の処理は、このグローバルなロックにより、1つのスレッドに限定されます。そのため、PythonではCPUバウンドな処理において、マルチスレッドを利用しても、スレッドがシリアル化されて順次実行されるため、高速化は期待しづらいです。この場合、マルチスレッドではなく、マルチプロセスの実装を検討した方が良い場合があります。

しかし、Pythonのマルチスレッド処理が全く意味がないというわけではありません。GILを避けるケースもあり、I/Oバウンドな処理、例えばDBからの応答を待つような場面では、マルチスレッドが効果的です。このようなケースでは、マルチスレッド処理の知識は非常に価値があります。

本記事では、threadingモジュールを用いたマルチスレッド処理の基本について紹介します。

なお、Pythonでマルチプロセスを実現するmultiprocessingモジュールの使い方については「multiprocessingによるマルチプロセスの基本」を参考にしてください。

並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。

threadingの使い方

基本的な使い方 Thread

threadingモジュールを使って、複数のスレッドを生成し、処理を実行する基本的な使い方を以下の例でみていきましょう。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker1():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def myworker2():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # スレッドの生成
    thread1 = Thread(target=myworker1)
    thread2 = Thread(target=myworker2)

    # スレッドの開始
    thread1.start()
    thread2.start()

    # スレッドの終了を待機
    thread1.join()
    thread2.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-1 (myworker1): end
Thread-2 (myworker2): end
MainThread: end

上記例は、myworker1myworker2という関数を別のスレッドで並列処理するプログラムになっています。実行してみていただくとThread-1とThread-2が動くことが分かるかと思います。上記のプログラムでのスレッドを簡単に絵に書いてみると以下のようになっています。

Python threading Thread

スレッドの生成は、threadingモジュールのThreadを使います。以下のようにtarget引数に各スレッドで実行する関数を渡します。

    # スレッドの生成
    thread1 = Thread(target=myworker1)
    thread2 = Thread(target=myworker2)

スレッドの開始は、それぞれでstartメソッドを実行します。

    # スレッドの開始
    thread1.start()
    thread2.start()

各スレッドの終了を待機するのがjoinメソッドです。スレッドが終了したら、joinより下のコードが実行されます。

    # スレッドの終了を待機
    thread1.join()
    thread2.join()

スレッド内の出力では以下の部分で定義しているようにloggingを使用しています。loggingformatthreadNameを指定すると分かりやすく表示できます。

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

loggingについては「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。

上記のようにThreadを使うことで簡単にマルチスレッド処理を書いていくことができます。

スレッド名称をつけたい場合

上記例では、スレッド名称(threadName)が「Thread-1」「Thread-2」というようになっていました。スレッドに個別に名称をつけたい場合には、Thread生成時に以下のようにname引数で指定します。(変更のない部分は省略します)

...(省略)...

    # スレッドの生成
    thread1 = Thread(target=myworker1, name="myworker1")
    thread2 = Thread(target=myworker2, name="myworker2")

...(省略)...
【実行結果】
MainThread: start
myworker1: start
myworker2: start
myworker1: end
myworker2: end
MainThread: end

name引数で指定することで、loggingで出力しているthreadNameの部分は指定した名称に変わります。

スレッドで動作する関数への引数の渡し方

スレッドで動作する関数へ引数を渡したい場合には、Threadの生成時に以下のようにargs引数とkwargs引数を使用します。(変更のない部分は省略します)

...(省略)...

def myworker1(x: int, y: int):
    logging.debug("start")
    logging.debug(f"x: {x}, y:{y}")
    time.sleep(5)
    logging.debug("end")


...(省略)...

    # スレッドの生成
    thread1 = Thread(target=myworker1, args=(10,), kwargs={"y": 20})

...(省略)...
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-1 (myworker1): x: 10, y:20
Thread-2 (myworker2): start
Thread-1 (myworker1): end
Thread-2 (myworker2): end
MainThread: end

上記例では、myworker1の方の関数に引数としてintx, yがあります。この関数に引数を渡す場合には、argsにタプルで位置引数で指定するか、kwargsに辞書でキーワード引数として指定することができます。argsはタプルなので、1つ引数を渡す場合は上記の(10,)のようにカンマ(,)がいるので注意しましょう。

実行結果を見てもわかるようにThread-1の方では、メインスレッドから引数として渡した情報を表示できていることが分かります。

デーモンとしてのスレッド実行方法

スレッドを「デーモンスレッド」にする場合には、以下のようにdaemon引数にTrueを設定します。デフォルトはFalseです。

デーモンとは、メインメモリ上に常駐して機能を提供するプログラムのことで、UNIX系OSで一般的な呼び方です。デーモンでない生存中のスレッドが全てなくなるとPythonプログラムは終了します。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker1():
    logging.debug("start")
    time.sleep(10)
    logging.debug("end")


def myworker2():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # スレッドの生成
    # thread1をデーモンに設定
    thread1 = Thread(target=myworker1, daemon=True)
    # thread2はそのまま
    thread2 = Thread(target=myworker2)

    # スレッドの開始
    thread1.start()
    thread2.start()

    # スレッドの終了まで待機
    # thread1.join()
    thread2.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-2 (myworker2): end
MainThread: end

上記例では、Thread-1の方のみdaemon引数をTrueにしています。daemonは、以下のようにThread生成後にdaemonプロパティにTrueを設定しても構いません。

    thread1 = Thread(target=myworker1)
    thread1.daemon = True

これまでの例と少し変わっている点としては、myworker1sleep時間を10sとして、myworker2よりも後に終わるようにしていることです。また、thread1.join()をコメントアウトしています。

結果を見てみると分かる通り、デーモンプロセスであるThread-1が終了する前にMainThreadが終了していることが分かります。これは、Thread-2が終了した時点でデーモンでない生存中のスレッドがなくなったためです。

なお、join()を記載するとスレッド終了まで待機するため、コメントアウトしているthread1.join()のコメントアウトを外すと以下のようにThread-1が終了してからMainThreadが終了します。

【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-2 (myworker2): end
Thread-1 (myworker1): end
MainThread: end

タイマーを用いたスレッド実行

threadingモジュールのTimerクラスを使用することで、指定した時間遅延の後に関数を実行するスレッドを生成することができます。具体的な使用方法は以下の通りです。

import logging
import time
from threading import Timer

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # タイマーで5秒後に実行開始
    thread = Timer(5, myworker, args=(10,), kwargs={"y": 20})

    # スレッドの開始
    thread.start()

    # スレッドの終了まで待機
    thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1: start
Thread-1: end
MainThread: end

Timerは、Threadのサブクラスであるため、使用方法は非常に似ています。主な違いは、最初の引数としてタイマーを指定する点です。このタイマーは遅延を秒単位で指定します。関数への引数やキーワード引数の渡し方はThreadと全く同じです。

上記例では、「MainThread: start」が表示された後、指定された5秒の遅延が発生し、「Thread-1: start」が表示され、その後の処理が進むことが確認できます。

スレッドの排他制御

マルチスレッド処理では、スレッド間でメモリを共有するためしっかりと排他制御をしないと思わぬ不具合を引き起こします。以下の簡単な例で問題が起こる場合を見てみましょう。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d):
    logging.debug("start")

    tmp = d["x"]
    time.sleep(5)
    d["x"] = tmp + 1

    logging.debug(f"end {d}")


def main():
    logging.debug("start")
    data = {"x": 0}

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data,)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # スレッドの終了まで待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-2 (mycounter): start
Thread-3 (mycounter): start
Thread-4 (mycounter): start
Thread-5 (mycounter): start
Thread-1 (mycounter): end {'x': 1}
Thread-2 (mycounter): end {'x': 1}
Thread-4 (mycounter): end {'x': 1}
Thread-3 (mycounter): end {'x': 1}
Thread-5 (mycounter): end {'x': 1}
MainThread: {'x': 1}
MainThread: end

上記例では、"x"というキーで値が0という要素を持っているdataという辞書を用意しています。スレッドを5つ用意し、mycounter関数へdataを渡すことで処理の実行カウントを取ろうとしています。結果としては5になってもらいたいと想定しましたが、結果として{'x': 1}という形になってしまっています。

この場合、5つのスレッドは「thread.start()」でほぼ同じタイミングで開始しているわけですが、その際にローカル変数のtmpd["x"]の値を退避してしまっています。このタイミングでは、5つのスレッドではいずれもtmp=0です。その後、5秒の処理後にtmp1を足した値を"x"のキーの値として更新しているため、すべてのスレッドで1になってしまっています。

これは複数スレッドが同じ辞書に同時にアクセスしてしまっているために発生しています。このような状況を競合状態(race condition)と言います。マルチスレッド処理ではこのような状況にならないように、うまく排他制御をする必要があります。

threadingモジュールでは、排他制御として実行中は他スレッドが入ってこれないようにする仕組みが用意されています。以下でthreadingモジュールで提供されているロック(LockRLock)とセマフォ(Semaphore)について紹介していきます。

ロックの取得 Lock

上記例を用いつつ、threadingモジュールのLockを使ってロックを取得する方法を紹介します。Lockは以下の例のように使用します。

import logging
import time
from threading import Lock, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d: dict, lock: Lock):
    # ロックを取得
    with lock:
        logging.debug("start")

        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1

        logging.debug(f"end {d}")

    # # with句を使わない場合の以下と同じ
    # # ロックを取得
    # lock.acquire()
    # # 以下のブロックがロックされる
    # tmp = d["x"]
    # time.sleep(1)
    # d["x"] = tmp + 1
    # # ロックをリリース
    # lock.release()


def main():
    logging.debug("start")
    data = {"x": 0}

    # ロックを生成
    lock = Lock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # スレッドの終了まで待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-1 (mycounter): end {'x': 1}
Thread-2 (mycounter): start
Thread-2 (mycounter): end {'x': 2}
Thread-3 (mycounter): start
Thread-3 (mycounter): end {'x': 3}
Thread-4 (mycounter): start
Thread-4 (mycounter): end {'x': 4}
Thread-5 (mycounter): start
Thread-5 (mycounter): end {'x': 5}
MainThread: {'x': 5}
MainThread: end

上記実行してみると、スレッドが順にロックを取得し、ロックを保持しているスレッドが処理を行っている間、他のスレッドはロックの解放を待機することが分かります。

ロックを取得するためには、threadingモジュールのLockクラスを使用します。main関数の以下部分で、Lockクラスのインスタンスを生成し、各スレッドに引数として渡しています。

...(省略)...
 
   data = {"x": 0}

    # ロックを生成
    lock = Lock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

...(省略)...

具体的にロックを取得しているのは以下の部分で、Lockのインスタンスでwith句を使っています。このようにするとwith句の中では、ロックが取得され、ブロック内の処理が終わるとロックが解放されます。

    # ロックを取得
    with lock:
        logging.debug("start")

        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1

        logging.debug(f"end {d}")

withを使わないで以下のように書くことも可能です。acquireメソッドでロックを取得し、releaseメソッドでロックを解放します。

    # ロックを取得
    lock.acquire()
    # 以下のブロックがロックされる
    tmp = d["x"]
    time.sleep(1)
    d["x"] = tmp + 1
    # ロックをリリース
    lock.release()

特に何か理由がない限りは自動でロックの解放までしてくれるwith句を使うのが適切かと思いますが、明示的にロックの取得や解放ができることは覚えておいてください。

以上のようにLockを使うことで、複数スレッドを排他制御しながら処理を進めることができます。

入れ子のロック取得 RLock

Lockを使うことでロックを取得することができました。しかし、同じスレッド内でLockを複数回取得しようとすると、デッドロックが発生して処理が止まってしまいます。この問題を解決するために、入れ子上にロックを取得したい場合はRLockを使用します。

threadingモジュールのRLockクラスを使って、入れ子上のロックを取得する方法を以下に示します。

import logging
import time
from threading import RLock, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d: dict, lock: RLock):
    # ロックを取得
    with lock:
        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1
        # ロック内で再度ロックを取得する
        with lock:
            tmp = d["x"]
            time.sleep(1)
            d["x"] = tmp + 1


def main():
    logging.debug("start")
    data = {"x": 0}

    # ロックを生成
    lock = RLock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()
    # 終了を待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-1 (mycounter): end {'x': 2}
Thread-2 (mycounter): start
Thread-2 (mycounter): end {'x': 4}
Thread-3 (mycounter): start
Thread-3 (mycounter): end {'x': 6}
Thread-4 (mycounter): start
Thread-4 (mycounter): end {'x': 8}
Thread-5 (mycounter): start
Thread-5 (mycounter): end {'x': 10}
MainThread: {'x': 10}
MainThread: end

RLockの使用方法はLockとほとんど同じですが、同一スレッド内で複数回ロックを取得してもデッドロックが発生しない点が異なります。

このような入れ子状のロックでは、RLockが連続してロックを取得し、リリースを行ってくれるため、処理が継続できます。この特性はLockにはありませんので、注意が必要です。

セマフォ Semaphore

ロックの他に排他制御の仕組みとしてセマフォがあります。セマフォは、特定のリソース上で同時に動作できるプロセスやスレッドの数を制御するもので、セマフォにより同時実行数を制限できます。

Pythonのthreadingモジュールでは、このセマフォ機能をSemaphoreクラスを使って実現できます。以下の例では、仮想的にデータベースへのアクセス数をセマフォを用いて一定数に制限する例を示しています。

import logging
import sqlite3
from threading import Semaphore, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def dbaccess(dbname: str, sema: Semaphore):
    # セマフォを使用
    with sema:
        logging.debug("start")

        # DB接続
        conn = sqlite3.connect(dbname)
        curs = conn.cursor()

        # DBを検索
        select_all_sql = "SELECT * FROM person"
        curs.execute(select_all_sql)
        rows = curs.fetchall()

        # DBをクローズ
        curs.close()
        conn.close()
        logging.debug(f"end: {rows}")


def main():
    logging.debug("start")

    # データベース名
    dbname = "test.db"
    # 最大コネクション数
    max_connections = 3

    # セマフォを生成
    sema = Semaphore(max_connections)

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=dbaccess, args=(dbname, sema)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()
    # 終了を待機
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (dbaccess): start
Thread-2 (dbaccess): start
Thread-3 (dbaccess): start
Thread-2 (dbaccess): end: [(1, 'TARO', 30)]
Thread-4 (dbaccess): start
Thread-3 (dbaccess): end: [(1, 'TARO', 30)]
Thread-5 (dbaccess): start
Thread-1 (dbaccess): end: [(1, 'TARO', 30)]
Thread-4 (dbaccess): end: [(1, 'TARO', 30)]
Thread-5 (dbaccess): end: [(1, 'TARO', 30)]
MainThread: end

上記の例では、test.dbという仮想的なsqliteのDBを用意して、Semaphoreを使って最大コネクション数を3に制御し、それぞれのスレッドでSELECTを実行しています。

生成しているスレッド数は5ですが、実行を開始するとまずThread-1, Thread-2, Thread-3の3つのスレッドが開始します。その後、Thread-2が終了するとThread-4が開始し、Thread-3が終了するとThread-5が開始します。このように、同時に起動しているスレッド数は3に制限されています。

Semaphoreの使用方法は、外見上はLockRLockの使い方と似ていますが、動作としては異なります。Semaphoreは同時実行数を制御するのに対し、LockRLockは排他制御を行います。特に、Semaphoreの生成時に同時実行の最大数を指定する点は、その特性を示す重要な部分です。

    # 最大コネクション数
    max_connections = 3

    # セマフォを生成
    sema = Semaphore(max_connections)

セマフォは、同時実行数に制限が必要な場面、特にリソースに容量の制限がある場面で使用されることが多いです。

有限セマフォ BoundedSemaphore

通常のSemaphoreでは、acquirereleaseを使ってセマフォの数を操作できます。aquireが実行されるとセマフォの数が減少し、releaseが実行されるとセマフォの数が増加することで同時実行数を制御しています。

通常のSemaphoreは、セマフォ値が0を下回ることができない一方で、初期値を超えることも許容されています。つまり、releaseを複数回実行すると、セマフォの値が初期値を超えることができます。

from threading import Semaphore

# 最大数(初期値)
max_sema = 2

# セマフォを生成
sema = Semaphore(max_sema)
# 初期値 value=2
print(sema)

# value=1
sema.acquire()
print(sema)
# value=0
sema.acquire()
print(sema)

# value=1
sema.release()
print(sema)
# value=2
sema.release()
print(sema)
# value=3 ← 初期値の最大を超える
sema.release()
print(sema)
【実行結果】
<threading.Semaphore at 0x1e4062b5790: value=2>
<threading.Semaphore at 0x1e4062b5790: value=1>
<threading.Semaphore at 0x1e4062b5790: value=0>
<threading.Semaphore at 0x1e4062b5790: value=1>
<threading.Semaphore at 0x1e4062b5790: value=2>
<threading.Semaphore at 0x1e4062b5790: value=3>

有限な資源を使う場合や、初期値を超えてセマフォをリリースしないように制御したい場合は、BoundedSemaphoreを使用すると良いでしょう。BoundedSemaphoreは、セマフォの数が初期値を超過するとValueErrorを送出します。

from threading import BoundedSemaphore

# 最大数(初期値)
max_sema = 2

# セマフォを生成
sema = BoundedSemaphore(max_sema)
# 初期値 value=2
print(sema)

# value=1
sema.acquire()
print(sema)
# value=0
sema.acquire()
print(sema)

# value=1
sema.release()
print(sema)
# value=2
sema.release()
print(sema)
# value=3 ← 初期値の最大を超える
sema.release()
print(sema)
【実行結果】
<threading.BoundedSemaphore at 0x28136f56350: value=2/2>
<threading.BoundedSemaphore at 0x28136f56350: value=1/2>
<threading.BoundedSemaphore at 0x28136f56350: value=0/2>
<threading.BoundedSemaphore at 0x28136f56350: value=1/2>
<threading.BoundedSemaphore at 0x28136f56350: value=2/2>

Traceback (most recent call last):
...(省略)...
ValueError: Semaphore released too many times

BoundedSemaphoreの使い方は、通常のSemaphoreと同じなので先ほどのSemaphoreの例は以下のように書き換えることができます。

...(省略)...


def dbaccess(dbname: str, sema: BoundedSemaphore):
    # 有限セマフォを使用
    with sema:
        logging.debug("start")

        ...(省略)...

        logging.debug(f"end: {rows}")


def main():
...(省略)...

    # 有限セマフォを生成
    sema = BoundedSemaphore(max_connections)

...(省略)...

上記の例では、with句を使っていますので、意図的にセマフォ値の超過を起こすことは考えにくいですが、セマフォの値の上限を確実に制限しながらセマフォを使いたい場合は、BoundedSemaphoreを選択すると良いでしょう。

Note

例で使っているsqliteのデータベースは以下のプログラムで簡単に作成できます。sqlite自体の使い方は「SQLiteの基本的な使い方」にまとめていますので興味があれば参考にしてください。

import sqlite3

# データベースに接続する
conn = sqlite3.connect("test.db")
# カーソルを作成する
curs = conn.cursor()

print("===== CREATE TABLE =====")
# テーブルを作成する
curs.execute(
    "CREATE TABLE IF NOT EXISTS person ("
    "id INTEGER PRIMARY KEY AUTOINCREMENT"
    ",name VARCHAR"
    ",age INTEGER)"
)
insert_sql = "INSERT INTO person(name, age) VALUES(?, ?)"
curs.execute(insert_sql, ("TARO", 30))
# コミットする
conn.commit()

# カーソルとコネクションをクローズする
curs.close()
conn.close()

複数スレッドの制御

複数スレッドを制御するための方法としてキュー(queue.Queue)、イベント(Event)、コンディション(Condition)、バリア(Barrier)という方法について紹介します。

キューを用いたスレッド制御 queue.Queue

複数スレッドを用いてデータに対して並列処理したい場合、固定数のスレッドプールを用意し、先入れ先出し(First In First Out: FIFO)のキューを使って順次データを取り出して処理するのが一般的な方法です。

キューを使った制御を行う際にはqueueモジュールのQueueを使用できます。スレッドプールとキューを使った制御の例を以下で見ていきましょう。

import logging
import time
from queue import Empty, Queue
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

# スレッドプールのサイズ
THREAD_POOL_SIZE = 3


def myworker(work_queue):
    logging.debug("start")

    # キューが空でない限り繰り返す
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            time.sleep(1)
            logging.debug(item)
            # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない
            work_queue.task_done()

    logging.debug("end")


def main():
    logging.debug("start")

    # スレッドで共有するキューを用意する
    work_queue = Queue()

    # 表示したい値のリスト
    vals = [i for i in range(10)]
    # キューに投入する
    for val in vals:
        work_queue.put(val)

    # スレッドプール数のスレッドを用意
    threads = [
        Thread(target=myworker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # キューが空になるまで待機
    work_queue.join()

    # 終了を待機
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
Thread-1 (myworker): start
Thread-2 (myworker): start
Thread-3 (myworker): start
Thread-1 (myworker): 0
Thread-2 (myworker): 1
Thread-3 (myworker): 2
Thread-1 (myworker): 3
Thread-3 (myworker): 5
Thread-2 (myworker): 4
Thread-1 (myworker): 6
Thread-2 (myworker): 8
Thread-2 (myworker): end
Thread-3 (myworker): 7
Thread-3 (myworker): end
Thread-1 (myworker): 9
Thread-1 (myworker): end

上記の例では、0~9までの数値リストを用意し、3つのスレッドに分けて表示しています。処理対象の数値データは以下の部分でキューを用意して投入しています。

from queue import Empty, Queue

...(省略)...

    # スレッドで共有するキューを用意する
    work_queue = Queue()

    # 表示したい値のリスト
    vals = [i for i in range(10)]
    # キューに投入する
    for val in vals:
        work_queue.put(val)

...(省略)...

キューを使用する際には、queueモジュールからQueueをインポートして生成します。キューに値を挿入するには、putメソッドで追加します。

スレッドプールのスレッドを用意している部分を抜粋したのが以下の部分になります。

# スレッドプールのサイズ
THREAD_POOL_SIZE = 3

...(省略)...

    # スレッドプール数のスレッドを用意
    threads = [
        Thread(target=myworker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # キューが空になるまで待機
    work_queue.join()

    # 終了を待機
    for thread in threads:
        thread.join()

...(省略)...

スレッドプールサイズの数だけスレッドを生成し、引数として作成したデータのキューを渡します。スレッドの開始については今までの通りです。キューについては空になるまで待機するjoinメソッドを「work_queue.join()」のように呼び出しています。

では、各スレッド動作でキューを使う部分を見ていきましょう。

from queue import Empty, Queue

...(省略)...

def myworker(work_queue: Queue):
    logging.debug("start")

    # キューが空でない限り繰り返す
    while not work_queue.empty():
        try:
            item = work_queue.get_nowait()
        except Empty:
            break
        else:
            time.sleep(1)
            logging.debug(item)
            # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない
            work_queue.task_done()

    logging.debug("end")

...(省略)..

各スレッドでは、キューが空でない限り繰り返すwhileループを使用しています。キューのemptyメソッドで「work_queue.empty()」によりキューが空かどうかを判定します。ただし、キューの状態が他のスレッドにより変更される可能性があります。そのため、この判定のみに依存するのは避けるべきです。

キューが空でない場合はget_nowaitメソッドで先頭の要素を取り出します。このメソッドは、キューが空の場合には直ちにEmpty例外を送出します。そのため、例外処理を用いてキューからの取得が失敗した場合の処理を記述しています。

このコード内では「work_queue.task_done()」の部分が重要です。task_doneメソッドはタスクの終了を知らせるもので、呼び出すとキューの内部の未完了数をマイナスします。「work_queue.join()」は、未完了数が0になると後続処理に進みます。そのため、この部分を書き忘れるとwork_queue.join()以降の処理に進まなくなってしまいますので注意してください。

上記のようにキューをうまく使うことで複数スレッドを用いたデータの並列処理を実現できます。

イベント Event

あるスレッドの結果を待ってから、別スレッドを開始させたいような場合には、threadingモジュールのEventを使用できます。以下の例で見てみましょう。

import logging
import time
from threading import Event, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(event: Event):
    # event.set()を待機
    event.wait()

    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def event_trigger(event: Event):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

    # wait状態のスレッドを起動
    event.set()


def main():
    logging.debug("start")

    # イベントを生成
    event = Event()

    # スレッドを生成
    trigger_thread = Thread(target=event_trigger, args=(event,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(event,)) for _ in range(thread_num)
    ]

    # イベント後に起動するスレッドを開始
    for thread in threads:
        thread.start()

    # イベントのトリガーとなるスレッドを開始
    trigger_thread.start()

    # スレッドの終了を待機
    trigger_thread.join()
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (event_trigger): start
Thread-1 (event_trigger): end
Thread-2 (myworker): start
Thread-4 (myworker): start
Thread-3 (myworker): start
Thread-3 (myworker): end
Thread-4 (myworker): end
Thread-2 (myworker): end
MainThread: end

上記例では、event_trigger関数が動くスレッドが完了すると、3つのmyworker関数のスレッドが同時に開始されることが分かります。

Eventは、以下のように生成して関連するスレッドに引数として渡します。

...(省略)...

    # イベントを生成
    event = Event()

    # スレッドを生成
    trigger_thread = Thread(target=event_trigger, args=(event,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(event,)) for _ in range(thread_num)
    ]

...(省略)...

後続で動作するスレッドのmyworker関数では、イベントが設定されるまでwaitメソッドで待機します。

...(省略)...

def myworker(event: Event):
    # event.set()を待機
    event.wait()

    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

...(省略)...

トリガーとなるスレッドのevent_trigger関数では、処理が終わったタイミングでsetメソッドを呼び出します。これにより、waitメソッドで待機していたスレッドが同時に起動します。

...(省略)...

def event_trigger(event: Event):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

    # wait状態のスレッドを起動
    event.set()

...(省略)...

さらに、Eventには、clearメソッドがあり、これを使ってイベントのフラグをリセットすることが可能です。これは、特定の状態で再びスレッドを待機させたい場合などに利用できます。

このようにEventを使用することで、特定の順序でスレッドの実行を制御することができます。特に、ある処理Aを完了させてから、その結果を利用して別のスレッドで処理Bを実行したい場合などにEventを使用すると非常に便利です。

コンディション Condition

Eventに似たものとしてthreadingモジュールにはConditionというものがあります。Eventとの主な違いは、Conditionは内部的なロックを持っており、一度に1つのスレッドのみがConditionのブロック内を実行できるという点です。

以下は、先ほどのEventの例をConditionで書いた例です。

import logging
import time
from threading import Condition, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(condition: Condition):
    with condition:
        # condition.notify_all()を待機
        condition.wait()

        logging.debug("start")
        time.sleep(5)
        logging.debug("end")


def condition_trigger(condition: Condition):
    with condition:
        logging.debug("start")
        time.sleep(5)
        logging.debug("end")
        
        # wait状態のスレッドを起動
        condition.notify_all()


def main():
    logging.debug("start")

    # コンディションを生成
    condition = Condition()

    # スレッドを生成
    trigger_thread = Thread(target=condition_trigger, args=(condition,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(condition,)) for _ in range(thread_num)
    ]

    # トリガーとなるスレッド実行後に起動するスレッドを開始
    for thread in threads:
        thread.start()

    # トリガーとなるスレッドを開始
    trigger_thread.start()

    # スレッドの終了を待機
    trigger_thread.join()
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (condition_trigger): start
Thread-1 (condition_trigger): end
Thread-3 (myworker): start
Thread-3 (myworker): end
Thread-2 (myworker): start
Thread-2 (myworker): end
Thread-4 (myworker): start
Thread-4 (myworker): end
MainThread: end

先ほどのEventではトリガーとなるイベントが完了した際に、後続のスレッドが同時に開始しましたが、Conditionを使った例では、待機していた後続スレッドが順次実行されます。これは、後続の各スレッドがロックを伴って動作するためです。

Condtionは、Eventと使い方はほとんど同じですが、重要な違いがあります。Eventでは、waitメソッドとsetメソッドを使用しましたが、Conditionでは、waitメソッドとnotify_allメソッドを使用します。さらに、Conditionオブジェクトは、with句を使用する必要があります。これにより、内部のロックの取得と解放が自動的に行われます。

...(省略)...

def myworker(condition: Condition):
    with condition:
        # condition.notify_all()を待機
        condition.wait()

        logging.debug("start")
        time.sleep(5)
        logging.debug("end")


def condition_trigger(condition: Condition):
    with condition:
        logging.debug("start")
        time.sleep(5)
        logging.debug("end")
        
        # wait状態のスレッドを起動
        condition.notify_all()

...(省略)...

このようにスレッドの順序関係を制御しつつ、ロックを併用する場合では、Conditionを使用するとよいでしょう。

バリア Barrier

EventConditionと似たような同期機構として、threadingモジュールにはBarrierというものがあります。Barrierは、複数のスレッドが特定の同期点に達するまで待機し、その後すべてのスレッドが同時に動作を再開するための機構です。

つまり、Barrierで指定した数のスレッドがすべてwaitメソッドを呼び出すまで、いずれのスレッドも進行できなくなります。以下の例で見てみましょう。

import logging
import time
from threading import Barrier, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def server(barrier: Barrier):
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def client(barrier: Barrier):
    time.sleep(5)
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def main():
    logging.debug("start")

    # バリアを生成
    barrier = Barrier(2)

    # スレッドの生成
    server_thread = Thread(target=server, args=(barrier,))
    client_thread = Thread(target=client, args=(barrier,))

    # スレッドを開始
    server_thread.start()
    client_thread.start()

    # スレッドの終了を待機
    server_thread.join()
    client_thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (server): start
Thread-2 (client): start
Thread-2 (client): execution
Thread-1 (server): execution
Thread-1 (server): execution
Thread-2 (client): execution
...(継続)...

上記例では、client関数の起動はtime.sleepで故意に遅らせています。server関数が先に開始しますが、Barrierで定義したスレッド数に達していないため、後続の処理はすぐには動きません。

client側のスレッドが起動し、Barrierで指定したスレッド数に達した時に、serverおよびclientの両方が動作を開始します。また、Barrierは指定した数のスレッドがwait()を呼び出した後、自動的にリセットされ、再度同じ数のスレッドがwait()を呼び出すのを待機する状態となります。

なお、上記スレッドの動作は、起動した後は無限ループになっているため手動で終了する必要があります。この点に注意してください。

使い方に関して、EventConditionとの違いは、Barrierの初期化時に動作を開始する条件となるスレッド数を指定する点です。

from threading import Barrier, Thread

...(省略)...

    # バリアを生成
    barrier = Barrier(2)

...(省略)...

各スレッドでは、以下のようにwaitメソッドを呼び出します。

...(省略)...

def server(barrier: Barrier):
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def client(barrier: Barrier):
    time.sleep(5)
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)

...(省略)...

上記のように、複数のスレッドが同期して動作を開始する必要がある場合、例えば、サーバーとクライアントを協調して動作するようなケースではBarrierを使用するとよいでしょう。

multiprocessing.dummyによるマルチスレッド

Pythonには、マルチプロセスを実現するmultiprocessingモジュールがありますが、同じAPIを利用してマルチスレッドを実現したい場合には、multiprocessing.dummyが便利です。これにより、コードの大部分を変更することなく、プロセスの代わりにスレッドを使用して並列処理を行うことができます。

上記でqueue.Queueを利用して作成したスレッドプールと同じような動作をPoolクラスで実現できます。

import logging
import time
from multiprocessing.dummy import Pool as ThreadPool

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

# スレッドプールのサイズ
THREAD_POOL_SIZE = 3


def myworker(x: int):
    logging.debug("start")

    time.sleep(1)

    logging.debug(f"end: {x}")


def main():
    logging.debug("start")

    # 表示したい値のリスト
    vals = [i for i in range(10)]

    # スレッドプールの使用
    with ThreadPool(THREAD_POOL_SIZE) as pool:
        results = pool.map_async(myworker, vals)

        # map_asyncは非同期なので↓はすぐに表示
        logging.debug("execute")

        # getを実行したら動作する
        logging.debug(results)
        results.get()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
MainThread: execute
MainThread: <multiprocessing.pool.MapResult object at 0x000002A00C6B1ED0>
Thread-1 (worker): start
Thread-2 (worker): start
Thread-3 (worker): start
Thread-1 (worker): end: 0
Thread-1 (worker): start
Thread-3 (worker): end: 2
Thread-3 (worker): start
Thread-2 (worker): end: 1
Thread-2 (worker): start
Thread-1 (worker): end: 3
Thread-1 (worker): start
Thread-3 (worker): end: 4
Thread-3 (worker): start
Thread-2 (worker): end: 5
Thread-2 (worker): start
Thread-1 (worker): end: 6
Thread-1 (worker): start
Thread-3 (worker): end: 7
Thread-2 (worker): end: 8
Thread-1 (worker): end: 9
MainThread: end

上記では、PoolクラスをThreadPoolとしてインポートしています。ThreadPoolwith句で、map_asyncメソッドに実行する関数と引数を渡すことでマルチスレッドで処理を実行できます。実行すると分かりますが、THREAD_POOL_SIZEに指定した数までのスレッドしか生成されないことが分かります。

Poolクラスは、multiprocessingモジュールでも中心的なものです。「multiprocessingによるマルチプロセスの基本」でプロセスプールの説明として使い方を記載していますので、あわせて確認してもらえるとよいかと思います。

上記のように、multiprocessingモジュールはマルチプロセスとマルチスレッドの両方で共通のAPIを持っています。マルチスレッドかマルチプロセスかは、どちらかが必ず優れていると言えるようなものではありません。解決する問題や要件に合わせて適切な手法を選択し、十分な検討を重ねることが大切です。

まとめ

Pythonでマルチスレッド処理を行うためのthreadingモジュールの使い方について解説しました。

threadingモジュールを使用すると、マルチスレッド処理を簡単に実現できます。このモジュールには、ロック(Lock, RLock)やセマフォ(Semaphore, BoundedSemaphore)などの排他制御機能、さらにスレッド間の制御を支援するEvent, Condition, Barrierといった仕組みが備わっています。また、queueモジュールのQueueと組み合わせることで、キューを活用したデータの並列処理も行うことができます。

また、マルチプロセスを実現するmultiprocessingモジュールの中に、スレッドを利用するためのmultiprocessing.dummyが存在しているため、こちらについても紹介しました。

Pythonでは、グローバルインタープリタロック(GIL)の存在により、CPUバウンドな処理でのマルチスレッドの効果は限定的です。しかし、I/Oバウンドな処理でマルチスレッドは有効であり、その使い方を理解することは非常に価値があります。

並列処理は複雑ですが、マルチスレッド処理を実践的に使いこなせるようになることをおすすめします。

Note

threadingモジュールの公式ドキュメントはこちらを参照してください。