threading

【Python】threadingによるマルチスレッド処理の基本

【Python】threadingによるマルチスレッド処理の基本

Pythonでマルチスレッド処理を行うためのthreadingモジュールの使い方について解説します。

Pythonによるマルチスレッド処理 threading

マルチスレッド処理とは、1つのメインのプロセスからメモリ空間を共有しつつ、複数のスレッドで並列処理することを言います。Pythonには、マルチスレッド処理のためのthreadingモジュールが提供されています。

Pythonには、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているPythonの実装は、C言語をベースにしたCPythonです。

CPythonの実装には、マルチスレッドを多くの状況で効果的に使用できないという大きな制限が存在します。これは、グローバルインタープリタロック(GIL)として知られています。GILは、全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。

GILの存在下で、Pythonオブジェクトにアクセスする際の処理は、このグローバルなロックにより、1つのスレッドに限定されます。そのため、PythonではCPUバウンドな処理において、マルチスレッドを利用しても、スレッドがシリアル化されて順次実行されるため、高速化されることはほとんどありません。この場合、マルチスレッドではなく、マルチプロセスの実装を検討した方が良い場合があります。

しかし、Pythonのマルチスレッド処理が全く意味がないというわけではありません。GILを避けるケースもあり、I/Oバウンドな処理、例えばDBからの応答を待つような場面では、マルチスレッドが効果的です。このようなケースでは、マルチスレッド処理の知識は非常に価値があります。

本記事では、threadingモジュールを用いたマルチスレッド処理の基本について紹介します。

Pythonでマルチプロセスを実現するmultiprocessingモジュールの使い方については「multiprocessingによるマルチプロセスの基本」でまとめていますので興味があれば参考にしてください。

threadingの使い方

基本的な使い方 Thread

threadingモジュールを使って、複数のスレッドを生成し、処理を実行する基本的な使い方を以下の例でみていきましょう。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker1():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def myworker2():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # スレッドの生成
    thread1 = Thread(target=myworker1)
    thread2 = Thread(target=myworker2)

    # スレッドの開始
    thread1.start()
    thread2.start()

    # スレッドの終了を待機
    thread1.join()
    thread2.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-1 (myworker1): end
Thread-2 (myworker2): end
MainThread: end

上記例は、myworker1とmyworker2という関数を別のスレッドで並列処理するプログラムになっています。実行してみていただくとThread-1とThread-2が動くことが分かるかと思います。上記のプログラムでのスレッドを簡単に絵に書いてみると以下のようになっています。

Python threading Thread

スレッドの生成は、threadingモジュールのThreadを使います。以下のようにtarget引数に各スレッドで実行する関数を渡します。

    # スレッドの生成
    thread1 = Thread(target=myworker1)
    thread2 = Thread(target=myworker2)

スレッドの開始は、それぞれでstartメソッドを実行します。

    # スレッドの開始
    thread1.start()
    thread2.start()

各スレッドの終了を待機するのがjoinメソッドです。スレッドが終了したら、joinより下のコードが実行されます。

    # スレッドの終了を待機
    thread1.join()
    thread2.join()

スレッド内の出力では以下の部分で定義しているようにloggingを使用しています。loggingのformatでthreadNameを指定すると分かりやすく表示できます。

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

loggingについては「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。

上記のようにThreadを使うことでマルチスレッド処理を書いていくことができます。

スレッド名称をつけたい場合

上記例では、スレッド名称(threadName)が「Thread-1」「Thread-2」というようになっていました。スレッドに個別に名称をつけたい場合には、Thread生成時に以下のようにname引数で指定します。(変更のない部分は省略します)

...(省略)...

    # スレッドの生成
    thread1 = Thread(target=myworker1, name="myworker1")
    thread2 = Thread(target=myworker2, name="myworker2")

...(省略)...
【実行結果】
MainThread: start
myworker1: start
myworker2: start
myworker1: end
myworker2: end
MainThread: end

name引数で指定することで、loggingで出力しているthreadNameの部分は指定した名称に変わります。

スレッドで動作する関数への引数の渡し方

スレッドで動作する関数へ引数を渡したい場合には、Threadの生成時に以下のようにargs引数とkwargs引数を使用します。(変更のない部分は省略します)

...(省略)...

def myworker1(x: int, y: int):
    logging.debug("start")
    logging.debug(f"x: {x}, y:{y}")
    time.sleep(5)
    logging.debug("end")


...(省略)...

    # スレッドの生成
    thread1 = Thread(target=myworker1, args=(10,), kwargs={"y": 20})

...(省略)...
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-1 (myworker1): x: 10, y:20
Thread-2 (myworker2): start
Thread-1 (myworker1): end
Thread-2 (myworker2): end
MainThread: end

上記例では、myworker1の方の関数に引数としてintのx, yがあります。この関数に引数を渡す場合には、argsにタプルで位置引数で指定するか、kwargsに辞書でキーワード引数として指定することができます。argsはタプルなので、1つ引数を渡す場合は上記の(10,)のようにカンマ(,)がいるので注意しましょう。

実行結果を見てもわかるようにThread-1の方では、メインスレッドから引数として渡した情報を表示できていることが分かります。

デーモンとしてのスレッド実行方法

スレッドを「デーモンスレッド」にする場合には、以下のようにdaemon引数にTrueを設定します。デフォルトはFalseです。

デーモンとは、メインメモリ上に常駐して機能を提供するプログラムのことで、UNIX系OSで一般的な呼び方です。デーモンでない生存中のスレッドが全てなくなるとPythonプログラムは終了します。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker1():
    logging.debug("start")
    time.sleep(10)
    logging.debug("end")


def myworker2():
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # スレッドの生成
    # thread1をデーモンに設定
    thread1 = Thread(target=myworker1, daemon=True)
    # thread2はそのまま
    thread2 = Thread(target=myworker2)

    # スレッドの開始
    thread1.start()
    thread2.start()

    # スレッドの終了まで待機
    # thread1.join()
    thread2.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-2 (myworker2): end
MainThread: end

上記例では、Thread-1の方のみdaemon引数をTrueにしています。daemonは、以下のようにThread生成後にdaemonプロパティにTrueを設定しても構いません。

    thread1 = Thread(target=myworker1)
    thread1.daemon = True

これまでの例と少し変わっている点としては、myworker1のsleep時間を10sとして、myworker2よりも後に終わるようにしていることです。また、thread1.join()をコメントアウトしています。

結果を見てみると分かる通り、デーモンプロセスであるThread-1が終了する前にMainThreadが終了していることが分かります。これは、Thread-2が終了した時点でデーモンでない生存中のスレッドがなくなったためです。

なお、join()を記載するとスレッド終了まで待機するため、コメントアウトしているthread1.join()のコメントアウトを外すと以下のようにThread-1が終了してからMainThreadが終了します。

【実行結果】
MainThread: start
Thread-1 (myworker1): start
Thread-2 (myworker2): start
Thread-2 (myworker2): end
Thread-1 (myworker1): end
MainThread: end

タイマーを用いたスレッド実行

スレッドにタイマーを設定して実行したい場合には、以下のようにThreadの代わりにTimerを使用できます。

import logging
import time
from threading import Timer

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(x: int, y: int):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def main():
    logging.debug("start")

    # タイマーで5秒後に実行開始
    thread = Timer(5, myworker, args=(10,), kwargs={"y": 20})

    # スレッドの開始
    thread.start()

    # スレッドの終了まで待機
    thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1: start
Thread-1: end
MainThread: end

Timerは、Threadのサブクラスとなっているので、使い方もほとんど同じで、違いは最初の引数としてタイマー時間(秒)を指定することです。関数への引数などもThread同様に渡すことができます。

実行してみていただければ分かりますが「MainThread: start」が表示されて5秒後に「Thread-1: start」が表示されて処理が進みます。

スレッドの排他制御

マルチスレッド処理では、スレッド間でメモリを共有するためしっかりと排他制御をしないと思わぬ不具合を引き起こします。以下の簡単な例で問題が起こる場合を見てみましょう。

import logging
import time
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d):
    logging.debug("start")

    tmp = d["x"]
    time.sleep(5)
    d["x"] = tmp + 1

    logging.debug(f"end {d}")


def main():
    logging.debug("start")
    data = {"x": 0}

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data,)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # スレッドの終了まで待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-2 (mycounter): start
Thread-3 (mycounter): start
Thread-4 (mycounter): start
Thread-5 (mycounter): start
Thread-1 (mycounter): end {'x': 1}
Thread-2 (mycounter): end {'x': 1}
Thread-4 (mycounter): end {'x': 1}
Thread-3 (mycounter): end {'x': 1}
Thread-5 (mycounter): end {'x': 1}
MainThread: {'x': 1}
MainThread: end

上記例では、”x”というキーで値が0という要素を持っているdataという辞書を用意しています。スレッドを5つ用意し、mycounter関数へdataを渡すことで処理の実行カウントを取ろうとしています。結果としては5になってもらいたいと想定しましたが、結果として{‘x’: 1}という形になってしまっています。

この場合、5つのスレッドは「thread.start()」でほぼ同じタイミングで開始しているわけですが、その際にtmpという引数にd[“x”]の値を退避してしまっています。このタイミングでは、5つのスレッドではいずれもtmp=0です。その後、5秒の処理後にtmpに1を足した値を”x”のキーの値として更新しているため、すべてのスレッドで1になってしまっています。

これは複数スレッドが同じ辞書に同時にアクセスしてしまっているために発生しています。このような状況を競合状態(race hazard または race condition)と言います。マルチスレッド処理ではこのような状況にならないように、うまく排他制御をする必要があります。

threadingモジュールでは、排他制御として実行中は他スレッドが入ってこれないようにする仕組みが用意されています。以下でthreadingモジュールで提供されているロック(LockRLock)とセマフォ(Semaphore)について紹介していきます。

ロックの取得 Lock

上記例を用いつつ、threadingモジュールのLockを使ってロックを取得する方法を紹介します。Lockは以下の例のように使用します。

import logging
import time
from threading import Lock, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d: dict, lock: Lock):
    # ロックを取得
    with lock:
        logging.debug("start")

        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1

        logging.debug(f"end {d}")

    # # with句を使わない場合の以下と同じ
    # # ロックを取得
    # lock.acquire()
    # # 以下のブロックがロックされる
    # tmp = d["x"]
    # time.sleep(1)
    # d["x"] = tmp + 1
    # # ロックをリリース
    # lock.release()


def main():
    logging.debug("start")
    data = {"x": 0}

    # ロックを生成
    lock = Lock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # スレッドの終了まで待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-1 (mycounter): end {'x': 1}
Thread-2 (mycounter): start
Thread-2 (mycounter): end {'x': 2}
Thread-3 (mycounter): start
Thread-3 (mycounter): end {'x': 3}
Thread-4 (mycounter): start
Thread-4 (mycounter): end {'x': 4}
Thread-5 (mycounter): start
Thread-5 (mycounter): end {'x': 5}
MainThread: {'x': 5}
MainThread: end

上記実行してみると分かりますが、スレッドが順に実行されていき、あるスレッドがロックを取得している間は他スレッドは止まっていることが分かります。

ロックを取得するためには、threadingモジュールのLockクラスを使用します。main関数の以下部分で、Lockクラスのインスタンスを生成し、各スレッドに引数として渡しています。

...(省略)...
 
   data = {"x": 0}

    # ロックを生成
    lock = Lock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

...(省略)...

具体的にロックを取得しているのは以下の部分で、Lockのインスタンスでwith句を使っています。このようにするとwith句の中では、ロックが取得され、ブロック内の処理が終わるとロックが解放されます。

    # ロックを取得
    with lock:
        logging.debug("start")

        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1

        logging.debug(f"end {d}")

withを使わないで以下のように書くことも可能です。acquireメソッドでロックを取得し、releaseメソッドでロックを解放します。

    # ロックを取得
    lock.acquire()
    # 以下のブロックがロックされる
    tmp = d["x"]
    time.sleep(1)
    d["x"] = tmp + 1
    # ロックをリリース
    lock.release()

特に何か理由がない限りは自動でロックの解放までしてくれるwith句を使うのが適切かと思いますが、明示的にロックの取得や解放ができることは覚えておいてください。

以上のようにLockを使うことで、複数スレッドを排他制御しながら処理を進めることができます。

入れ子のロック取得 RLock

Lockを使うことでロックを取得できることができました。入れ子状にロックを取得したい場合はどうすればよいでしょうか。Lockを使って入れ子でロックを取得しようとすると、最初に取得したロックのため、内部のロックを取得しようとしても取得ができずに処理が止まってしまいます。

入れ子状のロックを取得するためのクラスとしてthreadingモジュールのRLockクラスが使用できます。RLockは以下のように使用します。

import logging
import time
from threading import RLock, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def mycounter(d: dict, lock: RLock):
    # ロックを取得
    with lock:
        tmp = d["x"]
        time.sleep(1)
        d["x"] = tmp + 1
        # ロック内で再度ロックを取得する
        with lock:
            tmp = d["x"]
            time.sleep(1)
            d["x"] = tmp + 1


def main():
    logging.debug("start")
    data = {"x": 0}

    # ロックを生成
    lock = RLock()

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()
    # 終了を待機
    for thread in threads:
        thread.join()

    logging.debug(data)
    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (mycounter): start
Thread-1 (mycounter): end {'x': 2}
Thread-2 (mycounter): start
Thread-2 (mycounter): end {'x': 4}
Thread-3 (mycounter): start
Thread-3 (mycounter): end {'x': 6}
Thread-4 (mycounter): start
Thread-4 (mycounter): end {'x': 8}
Thread-5 (mycounter): start
Thread-5 (mycounter): end {'x': 10}
MainThread: {'x': 10}
MainThread: end

RLockの使用方法はLockと同じです。mycounter内ではwith句を入れ子状にして、それぞれのロックしたwith句で辞書の値を加算しています。

このような入れ子状では、RLockを使用することで順にロック取得、リリースをしてくれるため処理を継続することが可能です。(Lockではできないので注意しましょう)

セマフォ Semaphore

ロックの他に排他制御の仕組みとしてセマフォがあります。セマフォは、共有リソース上で同時に操作できるプロセスやスレッドの数を示すもので、セマフォにより同時実行数を制限できます。

Pythonでは、セマフォ実現のためにthreadingモジュールのSemaphoreが使用できます。以下例はデータベースへのアクセス数をセマフォを使って一定数に制限するような例です。

import logging
import sqlite3
from threading import Semaphore, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def dbaccess(dbname: str, sema: Semaphore):
    # セマフォを使用
    with sema:
        logging.debug("start")

        # DB接続
        conn = sqlite3.connect(dbname)
        curs = conn.cursor()

        # DBを検索
        select_all_sql = "SELECT * FROM person"
        curs.execute(select_all_sql)
        rows = curs.fetchall()

        # DBをクローズ
        curs.close()
        conn.close()
        logging.debug(f"end: {rows}")


def main():
    logging.debug("start")

    # データベース名
    dbname = "test.db"
    # 最大コネクション数
    max_connections = 3

    # セマフォを生成
    sema = Semaphore(max_connections)

    # スレッドの生成
    thread_num = 5
    threads = [
        Thread(target=dbaccess, args=(dbname, sema)) for _ in range(thread_num)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()
    # 終了を待機
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (dbaccess): start
Thread-2 (dbaccess): start
Thread-3 (dbaccess): start
Thread-2 (dbaccess): end: [(1, 'TARO', 30)]
Thread-4 (dbaccess): start
Thread-3 (dbaccess): end: [(1, 'TARO', 30)]
Thread-5 (dbaccess): start
Thread-1 (dbaccess): end: [(1, 'TARO', 30)]
Thread-4 (dbaccess): end: [(1, 'TARO', 30)]
Thread-5 (dbaccess): end: [(1, 'TARO', 30)]
MainThread: end

上記例では、test.dbというsqliteのDBをあらかじめ用意しておいて、Semaphoreを使って最大コネクション数を3に制御し、それぞれのスレッドでSELECTを実行しています。

生成しているスレッド数は5ですが、実行を開始するとまずThread-1, Thread-2, Thread-3の3つのスレッドが開始します。その後、Thread-2が終了するとThread-4が開始し、Thread-3が終了するとThread-5が開始しているため、同時に起動しているスレッド数は3に制限されています。

Semaphoreの使用方法は、これまでのLockやRLockの使い方とほとんど同じです。異なる点としては、生成時に以下のように同時実行の最大数を指定していることです。

    # 最大コネクション数
    max_connections = 3

    # セマフォを生成
    sema = Semaphore(max_connections)

上記のようにセマフォを使用すると、同時実行数を制御できるため容量に限りある資源を保護するためによく使われます。

有限セマフォ BoundedSemaphore

Semaphoreの上記例ではwith句を使っていましたが、Lockで紹介したと同様にacquire、releaseを使ってセマフォの数を操作できます。acquireが実行されるとセマフォの数がマイナスされ、releaseが実行されるとセマフォの数がプラスされることで同時実行数を制御しています。

通常のSemaphoreは、セマフォ値が0を下回ることができない一方、初期値を超えても問題ありません。つまり、releaseを複数回実行すると以下のように初期値よりも多い数とすることができます。

from threading import Semaphore

# 最大数(初期値)
max_sema = 2

# セマフォを生成
sema = Semaphore(max_sema)
# 初期値 value=2
print(sema)

# value=1
sema.acquire()
print(sema)
# value=0
sema.acquire()
print(sema)

# value=1
sema.release()
print(sema)
# value=2
sema.release()
print(sema)
# value=3 ← 初期値の最大を超える
sema.release()
print(sema)
【実行結果】
<threading.Semaphore at 0x1e4062b5790: value=2>
<threading.Semaphore at 0x1e4062b5790: value=1>
<threading.Semaphore at 0x1e4062b5790: value=0>
<threading.Semaphore at 0x1e4062b5790: value=1>
<threading.Semaphore at 0x1e4062b5790: value=2>
<threading.Semaphore at 0x1e4062b5790: value=3>

有限な資源を使う場合には、初期値で設定したい値を超過しないようにチェックをしながらセマフォを使いたくなります。この場合は、有限セマフォであるBoundedSemaphoreを使用します。BoundedSemaphoreでは、セマフォの数が初期値を超過すると以下のようにValueErrorを送出します。

from threading import BoundedSemaphore

# 最大数(初期値)
max_sema = 2

# セマフォを生成
sema = BoundedSemaphore(max_sema)
# 初期値 value=2
print(sema)

# value=1
sema.acquire()
print(sema)
# value=0
sema.acquire()
print(sema)

# value=1
sema.release()
print(sema)
# value=2
sema.release()
print(sema)
# value=3 ← 初期値の最大を超える
sema.release()
print(sema)
【実行結果】
<threading.BoundedSemaphore at 0x28136f56350: value=2/2>
<threading.BoundedSemaphore at 0x28136f56350: value=1/2>
<threading.BoundedSemaphore at 0x28136f56350: value=0/2>
<threading.BoundedSemaphore at 0x28136f56350: value=1/2>
<threading.BoundedSemaphore at 0x28136f56350: value=2/2>

Traceback (most recent call last):
...(省略)...
ValueError: Semaphore released too many times

BoundedSemaphoreの使い方は、Semaphoreと同じなため、先ほどのSemaphoreの例はで以下のように書き換えることができます。

...(省略)...


def dbaccess(dbname: str, sema: BoundedSemaphore):
    # 有限セマフォを使用
    with sema:
        logging.debug("start")

        ...(省略)...

        logging.debug(f"end: {rows}")


def main():
...(省略)...

    # 有限セマフォを生成
    sema = BoundedSemaphore(max_connections)

...(省略)...

上記例ではwithを使っているので超過を起こすことはないと思いますが、上限を確実に制限しながらセマフォを使いたい場合は、BoundedSemaphoreを使用するようにするとよいでしょう。

Note

例で使っているsqliteのデータベースは以下のプログラムで簡単に作成できます。sqlite自体の使い方は「SQLiteの基本的な使い方」にまとめていますので興味があれば参考にしてください。

import sqlite3

# データベースに接続する
conn = sqlite3.connect("test.db")
# カーソルを作成する
curs = conn.cursor()

print("===== CREATE TABLE =====")
# テーブルを作成する
curs.execute(
    "CREATE TABLE IF NOT EXISTS person ("
    "id INTEGER PRIMARY KEY AUTOINCREMENT"
    ",name VARCHAR"
    ",age INTEGER)"
)
insert_sql = "INSERT INTO person(name, age) VALUES(?, ?)"
curs.execute(insert_sql, ("TARO", 30))
# コミットする
conn.commit()

# カーソルとコネクションをクローズする
curs.close()
conn.close()

複数スレッドの制御

複数スレッドを制御するための方法としてキュー(queue.Queue)、イベント(Event)、コンディション(Condition)、バリア(Barrier)という方法について紹介します。

キューを用いたスレッド制御 queue.Queue

複数スレッドを用いてデータに対して並列処理したい場合には、大きさを定義したスレッドプールを用意し、並列処理をそこで扱うということを良く行います。この際には、固定数のスレッドを用意し、先入れ先出し(First In First Out: FIFO)のキューを使って順次データを取り出して処理するのが一般的な方法です。

キューを使った制御を行う際にはqueueモジュールのQueueを使用できます。スレッドプールとキューを使った制御の例を以下で見ていきましょう。

import logging
import time
from queue import Empty, Queue
from threading import Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

# スレッドプールのサイズ
THREAD_POOL_SIZE = 3


def myworker(work_queue: Queue):
    logging.debug("start")

    # キューが空でない限り繰り返す
    while not work_queue.empty():
        try:
            item = work_queue.get()
        except Empty:
            break
        else:
            time.sleep(1)
            logging.debug(item)
            # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない
            work_queue.task_done()

    logging.debug("end")


def main():
    # スレッドで共有するキューを用意する
    work_queue = Queue()

    # 表示したい値のリスト
    vals = [i for i in range(10)]
    # キューに投入する
    for val in vals:
        work_queue.put(val)

    # スレッドプール数のスレッドを用意
    threads = [
        Thread(target=myworker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # キューが空になるまで待機
    work_queue.join()

    # 終了を待機
    for thread in threads:
        thread.join()


if __name__ == "__main__":
    main()
【実行結果】
Thread-1 (myworker): start
Thread-2 (myworker): start
Thread-3 (myworker): start
Thread-1 (myworker): 0
Thread-2 (myworker): 1
Thread-3 (myworker): 2
Thread-1 (myworker): 3
Thread-3 (myworker): 5
Thread-2 (myworker): 4
Thread-1 (myworker): 6
Thread-2 (myworker): 8
Thread-2 (myworker): end
Thread-3 (myworker): 7
Thread-3 (myworker): end
Thread-1 (myworker): 9
Thread-1 (myworker): end

上記実行してみていただくと分かりますが、順番に先頭の0からの数値が各スレッドに振り分けられて表示がされていくことが分かるかと思います。

この例では、0~9までの数値リストを用意し、3つのスレッドに分けて表示しています。処理対象の数値データは以下の部分でキューを用意して投入しています。

from queue import Empty, Queue

...(省略)...

    # スレッドで共有するキューを用意する
    work_queue = Queue()

    # 表示したい値のリスト
    vals = [i for i in range(10)]
    # キューに投入する
    for val in vals:
        work_queue.put(val)

...(省略)...

キューを使用する際には、queueモジュールからQueueをインポートして生成します。キューに値を挿入するには、putメソッドで追加します。

スレッドプールのスレッドを用意している部分を抜粋したのが以下の部分になります。

# スレッドプールのサイズ
THREAD_POOL_SIZE = 3

...(省略)...

    # スレッドプール数のスレッドを用意
    threads = [
        Thread(target=myworker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]

    # スレッドの開始
    for thread in threads:
        thread.start()

    # キューが空になるまで待機
    work_queue.join()

    # 終了を待機
    for thread in threads:
        thread.join()

...(省略)...

スレッドプールサイズの数だけスレッドを生成し、引数として作成したデータのキューを渡します。スレッドの開始については今までの通りです。キューについては空になるまで待機するjoinメソッドを「work_queue.join()」のように呼び出しています。

では、各スレッド動作でキューを使う部分を見ていきましょう。

from queue import Empty, Queue

...(省略)...

def myworker(work_queue: Queue):
    logging.debug("start")

    # キューが空でない限り繰り返す
    while not work_queue.empty():
        try:
            item = work_queue.get()
        except Empty:
            break
        else:
            time.sleep(1)
            logging.debug(item)
            # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない
            work_queue.task_done()

    logging.debug("end")

...(省略)..

各スレッドでは、キューが空でない限り繰り返すwhileループを使用しています。キューのemptyメソッドで「work_queue.empty()」によりキューが空かどうかを判定します。

キューが空でない場合はgetメソッドで先頭の要素を取り出します。値を取り出せない場合にはEmpty例外が送出されるので、その場合はループをbreakします。

このコード内では「work_queue.task_done()」の部分が重要です。task_doneメソッドはタスクの終了することを知らせるもので、呼び出すとキューの内部の未完了数をマイナスします。「work_queue.join()」は、未完了数が0になると後続処理に進みます。そのため、この部分を書き忘れるとwork_queue.join()以降の処理に進まなくなってしまいますので注意してください。

上記のようにキューをうまく使うことで複数スレッドを用いたデータの並列処理を実現できます。

イベント Event

あるスレッドの結果を待ってから、別スレッドを開始させたいような場合には、threadingモジュールのEventを使用できます。以下の例で見てみましょう。

import logging
import time
from threading import Event, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(event: Event):
    # event.set()を待機
    event.wait()

    logging.debug("start")
    time.sleep(5)
    logging.debug("end")


def event_trigger(event: Event):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

    # wait状態のスレッドを起動
    event.set()


def main():
    logging.debug("start")

    # イベントを生成
    event = Event()

    # スレッドを生成
    trigger_thread = Thread(target=event_trigger, args=(event,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(event,)) for _ in range(thread_num)
    ]

    # イベント後に起動するスレッドを開始
    for thread in threads:
        thread.start()

    # イベントのトリガーとなるスレッドを開始
    trigger_thread.start()

    # スレッドの終了を待機
    trigger_thread.join()
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (event_trigger): start
Thread-1 (event_trigger): end
Thread-2 (myworker): start
Thread-4 (myworker): start
Thread-3 (myworker): start
Thread-3 (myworker): end
Thread-4 (myworker): end
Thread-2 (myworker): end
MainThread: end

上記例では、event_trigger関数が動くスレッドが完了してから、後続のmyworker関数が動く3つのスレッドが動作していることが分かります。

Eventは、以下のように生成して関連するスレッドに引数で渡します。

...(省略)...

    # イベントを生成
    event = Event()

    # スレッドを生成
    trigger_thread = Thread(target=event_trigger, args=(event,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(event,)) for _ in range(thread_num)
    ]

...(省略)...

後続で動作するスレッドのmyworker関数では、以下のようにwaitメソッドでイベントが発生するのを待機します。

...(省略)...

def myworker(event: Event):
    # event.set()を待機
    event.wait()

    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

...(省略)...

トリガーとなるスレッドのevent_triggerメソッドでは、処理が終わったタイミングでsetメソッドを呼び出します。

...(省略)...

def event_trigger(event: Event):
    logging.debug("start")
    time.sleep(5)
    logging.debug("end")

    # wait状態のスレッドを起動
    event.set()

...(省略)...

setメソッドが呼び出されると、waitメソッドで待機していたスレッドが起動します。このようにすることでスレッドの順序関係を制御しながら処理を進めることが可能です。

ある処理Aを実行して結果を作ってから、後続の処理Bで処理Aの結果を使って分散処理するような順序関係を持った機能を開発するときにはEventを使うのが便利です。

コンディション Condition

Eventに似たものとしてthreadingモジュールにはConditionというものがあります。Eventとの違いは、Conditionはロックに関連付けて動作するという点です。

以下は、先ほどのEventの例をConditionで書いた例です。

import logging
import time
from threading import Condition, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def myworker(condition: Condition):
    with condition:
        # condition.notify_all()を待機
        condition.wait()

        logging.debug("start")
        time.sleep(5)
        logging.debug("end")


def condition_trigger(condition: Condition):
    with condition:
        logging.debug("start")
        time.sleep(5)
        logging.debug("end")
        
        # wait状態のスレッドを起動
        condition.notify_all()


def main():
    logging.debug("start")

    # コンディションを生成
    condition = Condition()

    # スレッドを生成
    trigger_thread = Thread(target=condition_trigger, args=(condition,))
    thread_num = 3
    threads = [
        Thread(target=myworker, args=(condition,)) for _ in range(thread_num)
    ]

    # トリガーとなるスレッド実行後に起動するスレッドを開始
    for thread in threads:
        thread.start()

    # トリガーとなるスレッドを開始
    trigger_thread.start()

    # スレッドの終了を待機
    trigger_thread.join()
    for thread in threads:
        thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (condition_trigger): start
Thread-1 (condition_trigger): end
Thread-3 (myworker): start
Thread-3 (myworker): end
Thread-2 (myworker): start
Thread-2 (myworker): end
Thread-4 (myworker): start
Thread-4 (myworker): end
MainThread: end

先ほどのEventではトリガーとなるイベントが完了した際に、後続のスレッドが同時に開始しましたが、Conditionを使った例では、待機していた後続スレッドが順次実行されます。これは、後続の各スレッドがロックを伴って動作するためです。

Condtionは、Eventと使い方はほとんど同じです。Eventを使った際には、waitメソッドとsetメソッドを使用しましたが、Conditionでは、waitメソッドとnotify_allメソッドを使う点とwith句を使って使用する点で違いがあります。

...(省略)...

def myworker(condition: Condition):
    with condition:
        # condition.notify_all()を待機
        condition.wait()

        logging.debug("start")
        time.sleep(5)
        logging.debug("end")


def condition_trigger(condition: Condition):
    with condition:
        logging.debug("start")
        time.sleep(5)
        logging.debug("end")
        
        # wait状態のスレッドを起動
        condition.notify_all()

...(省略)...

このようにスレッドの順序関係を制御しつつ、ロックを併用するような場合には、Conditionを使用するとよいでしょう。

バリア Barrier

Event, Conditionと似たようなもう一つの機能として、threadingモジュールにはBarrierというものがあります。Barrierは、互いを待つ必要がある固定数のスレッドで同期するために使用できます。

例えば、サーバーとクライアントが同時に起動していないと後続処理を動かせないような場合が単純な例です。以下の例で見てみましょう。

import logging
import time
from threading import Barrier, Thread

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")


def server(barrier: Barrier):
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def client(barrier: Barrier):
    time.sleep(5)
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def main():
    logging.debug("start")

    # バリアを生成
    barrier = Barrier(2)

    # スレッドの生成
    server_thread = Thread(target=server, args=(barrier,))
    client_thread = Thread(target=client, args=(barrier,))

    # スレッドを開始
    server_thread.start()
    client_thread.start()

    # スレッドの終了を待機
    server_thread.join()
    client_thread.join()

    logging.debug("end")


if __name__ == "__main__":
    main()
【実行結果】
MainThread: start
Thread-1 (server): start
Thread-2 (client): start
Thread-2 (client): execution
Thread-1 (server): execution
Thread-1 (server): execution
Thread-2 (client): execution
...(継続)...

上記例では、client関数をtime.sleepで起動をあえて遅くしています。serverが先に開始しますが、Barrierで定義した数にスレッド数が達していないため後続処理はすぐには動きません。client側のスレッドが実行されてBarrierで定義したスレッド数に達した時にserver, client双方が動き出します。なお、上記スレッドは起動した後は動き続ける無限ループになっているので強制終了してください。

使い方はEventやConditionとほとんど同じですが、引数として動作を開始する条件となるスレッド数を引数に指定します。

from threading import Barrier, Thread

...(省略)...

    # バリアを生成
    barrier = Barrier(2)

...(省略)...

各スレッドで動くserver, client関数については、開始の時点で以下のようにwaitメソッドを呼び出します。

...(省略)...

def server(barrier: Barrier):
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)


def client(barrier: Barrier):
    time.sleep(5)
    logging.debug("start")

    # バリア数スレッドが起動するまで待機
    barrier.wait()

    while True:
        logging.debug("execution")
        time.sleep(5)

...(省略)...

waitメソッドがBarrierに指定した回数呼び出された際に、wait以降の処理が動作を開始します。

上記例のように、サーバーとクライアントを協調して動かさないといけないようなケースではBarrierを使用するとよいでしょう。

まとめ

Pythonでマルチスレッド処理を行うためのthreadingモジュールの使い方について解説しました。

threadingモジュールでは、マルチスレッド処理を簡単実現できるようになっています。さらに、ロック(Lock, RLock)やセマフォ(Semaphore)といった排他制御や、スレッドを制御するための仕組み(Event, Condition, Barrier)も用意されています。また、キューを提供するqueueモジュールのQueueと組み合わせることでキューによるデータの並列処理も実現できます。

Pythonでは、グローバルインタープリタロック(GIL)のため、CPUに負荷のかかるCPUバウンドな処理ではマルチスレッドはあまり意味を持ちません。しかし、I/Oに負荷のかかるI/Oバウンドな処理などでは効果がありますので、マルチスレッド処理の方法を知っておくことには意味があります。

並列処理は正直難しい領域であると思いますが、是非マルチスレッド処理について触ってみて使えるようになってもらえたらと思います。

Note

threadingモジュールの公式ドキュメントはこちらを参照してください。