Pythonでマルチスレッド処理を行うためのthreading
モジュールの使い方について解説します。
Contents
Pythonによるマルチスレッド処理 threading
マルチスレッド処理とは、同一のメモリ空間内で、複数のスレッドで並列処理することを言います。Pythonには、マルチスレッド処理のためのthreading
モジュールが提供されています。
Pythonの実装には、CPython、Jython、IronPythonなどの複数の実装がありますが、最も広く使用されているPythonの実装は、C言語をベースにしたCPythonです。
CPythonの実装には、マルチスレッドを多くの状況で効果的に使用できないという大きな制限が存在します。これは、グローバルインタープリタロック(GIL)として知られています。GILは、全てのPython実装に共通する性質ではなく、C言語で実装されたCPython特有のものであることを理解しておくべきです。例えば、Javaで実装されたJythonにはGILは存在しません。
GILの存在下で、Pythonオブジェクトにアクセスする際の処理は、このグローバルなロックにより、1つのスレッドに限定されます。そのため、PythonではCPUバウンドな処理において、マルチスレッドを利用しても、スレッドがシリアル化されて順次実行されるため、高速化は期待しづらいです。この場合、マルチスレッドではなく、マルチプロセスの実装を検討した方が良い場合があります。
しかし、Pythonのマルチスレッド処理が全く意味がないというわけではありません。GILを避けるケースもあり、I/Oバウンドな処理、例えばDBからの応答を待つような場面では、マルチスレッドが効果的です。このようなケースでは、マルチスレッド処理の知識は非常に価値があります。
本記事では、threading
モジュールを用いたマルチスレッド処理の基本について紹介します。
なお、Pythonでマルチプロセスを実現するmultiprocessing
モジュールの使い方については「multiprocessingによるマルチプロセスの基本」を参考にしてください。
並行処理や並列処理、I/OバウンドやCPUバウンドとマルチスレッド、マルチプロセスの関係について「並行・並列処理、I/Oバウンド・CPUバウンドを理解する」でまとめていますので興味があれば参考にしてください。
threadingの使い方
基本的な使い方 Thread
threading
モジュールを使って、複数のスレッドを生成し、処理を実行する基本的な使い方を以下の例でみていきましょう。
import logging import time from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker1(): logging.debug("start") time.sleep(5) logging.debug("end") def myworker2(): logging.debug("start") time.sleep(5) logging.debug("end") def main(): logging.debug("start") # スレッドの生成 thread1 = Thread(target=myworker1) thread2 = Thread(target=myworker2) # スレッドの開始 thread1.start() thread2.start() # スレッドの終了を待機 thread1.join() thread2.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (myworker1): start Thread-2 (myworker2): start Thread-1 (myworker1): end Thread-2 (myworker2): end MainThread: end
上記例は、myworker1
とmyworker2
という関数を別のスレッドで並列処理するプログラムになっています。実行してみていただくとThread-1とThread-2が動くことが分かるかと思います。上記のプログラムでのスレッドを簡単に絵に書いてみると以下のようになっています。
スレッドの生成は、threadingモジュールのThread
を使います。以下のようにtarget
引数に各スレッドで実行する関数を渡します。
# スレッドの生成 thread1 = Thread(target=myworker1) thread2 = Thread(target=myworker2)
スレッドの開始は、それぞれでstart
メソッドを実行します。
# スレッドの開始 thread1.start() thread2.start()
各スレッドの終了を待機するのがjoin
メソッドです。スレッドが終了したら、joinより下のコードが実行されます。
# スレッドの終了を待機 thread1.join() thread2.join()
スレッド内の出力では以下の部分で定義しているようにlogging
を使用しています。logging
のformat
でthreadName
を指定すると分かりやすく表示できます。
logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")
logging
については「loggingの基本的な使い方」でまとめていますので興味があれば参考にしてください。
上記のようにThread
を使うことで簡単にマルチスレッド処理を書いていくことができます。
スレッド名称をつけたい場合
上記例では、スレッド名称(threadName
)が「Thread-1」「Thread-2」というようになっていました。スレッドに個別に名称をつけたい場合には、Thread
生成時に以下のようにname
引数で指定します。(変更のない部分は省略します)
...(省略)... # スレッドの生成 thread1 = Thread(target=myworker1, name="myworker1") thread2 = Thread(target=myworker2, name="myworker2") ...(省略)...
【実行結果】 MainThread: start myworker1: start myworker2: start myworker1: end myworker2: end MainThread: end
name
引数で指定することで、logging
で出力しているthreadName
の部分は指定した名称に変わります。
スレッドで動作する関数への引数の渡し方
スレッドで動作する関数へ引数を渡したい場合には、Thread
の生成時に以下のようにargs
引数とkwargs
引数を使用します。(変更のない部分は省略します)
...(省略)... def myworker1(x: int, y: int): logging.debug("start") logging.debug(f"x: {x}, y:{y}") time.sleep(5) logging.debug("end") ...(省略)... # スレッドの生成 thread1 = Thread(target=myworker1, args=(10,), kwargs={"y": 20}) ...(省略)...
【実行結果】 MainThread: start Thread-1 (myworker1): start Thread-1 (myworker1): x: 10, y:20 Thread-2 (myworker2): start Thread-1 (myworker1): end Thread-2 (myworker2): end MainThread: end
上記例では、myworker1
の方の関数に引数としてint
のx
, y
があります。この関数に引数を渡す場合には、args
にタプルで位置引数で指定するか、kwargs
に辞書でキーワード引数として指定することができます。args
はタプルなので、1つ引数を渡す場合は上記の(10,)
のようにカンマ(,)がいるので注意しましょう。
実行結果を見てもわかるようにThread-1の方では、メインスレッドから引数として渡した情報を表示できていることが分かります。
デーモンとしてのスレッド実行方法
スレッドを「デーモンスレッド」にする場合には、以下のようにdaemon
引数にTrue
を設定します。デフォルトはFalse
です。
デーモンとは、メインメモリ上に常駐して機能を提供するプログラムのことで、UNIX系OSで一般的な呼び方です。デーモンでない生存中のスレッドが全てなくなるとPythonプログラムは終了します。
import logging import time from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker1(): logging.debug("start") time.sleep(10) logging.debug("end") def myworker2(): logging.debug("start") time.sleep(5) logging.debug("end") def main(): logging.debug("start") # スレッドの生成 # thread1をデーモンに設定 thread1 = Thread(target=myworker1, daemon=True) # thread2はそのまま thread2 = Thread(target=myworker2) # スレッドの開始 thread1.start() thread2.start() # スレッドの終了まで待機 # thread1.join() thread2.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (myworker1): start Thread-2 (myworker2): start Thread-2 (myworker2): end MainThread: end
上記例では、Thread-1の方のみdaemon
引数をTrue
にしています。daemon
は、以下のようにThread
生成後にdaemon
プロパティにTrue
を設定しても構いません。
thread1 = Thread(target=myworker1) thread1.daemon = True
これまでの例と少し変わっている点としては、myworker1
のsleep
時間を10sとして、myworker2
よりも後に終わるようにしていることです。また、thread1.join()
をコメントアウトしています。
結果を見てみると分かる通り、デーモンプロセスであるThread-1が終了する前にMainThreadが終了していることが分かります。これは、Thread-2が終了した時点でデーモンでない生存中のスレッドがなくなったためです。
なお、join()
を記載するとスレッド終了まで待機するため、コメントアウトしているthread1.join()
のコメントアウトを外すと以下のようにThread-1が終了してからMainThreadが終了します。
【実行結果】 MainThread: start Thread-1 (myworker1): start Thread-2 (myworker2): start Thread-2 (myworker2): end Thread-1 (myworker1): end MainThread: end
タイマーを用いたスレッド実行
threading
モジュールのTimer
クラスを使用することで、指定した時間遅延の後に関数を実行するスレッドを生成することができます。具体的な使用方法は以下の通りです。
import logging import time from threading import Timer logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(x: int, y: int): logging.debug("start") time.sleep(5) logging.debug("end") def main(): logging.debug("start") # タイマーで5秒後に実行開始 thread = Timer(5, myworker, args=(10,), kwargs={"y": 20}) # スレッドの開始 thread.start() # スレッドの終了まで待機 thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1: start Thread-1: end MainThread: end
Timer
は、Thread
のサブクラスであるため、使用方法は非常に似ています。主な違いは、最初の引数としてタイマーを指定する点です。このタイマーは遅延を秒単位で指定します。関数への引数やキーワード引数の渡し方はThread
と全く同じです。
上記例では、「MainThread: start」が表示された後、指定された5秒の遅延が発生し、「Thread-1: start」が表示され、その後の処理が進むことが確認できます。
スレッドの排他制御
マルチスレッド処理では、スレッド間でメモリを共有するためしっかりと排他制御をしないと思わぬ不具合を引き起こします。以下の簡単な例で問題が起こる場合を見てみましょう。
import logging import time from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def mycounter(d): logging.debug("start") tmp = d["x"] time.sleep(5) d["x"] = tmp + 1 logging.debug(f"end {d}") def main(): logging.debug("start") data = {"x": 0} # スレッドの生成 thread_num = 5 threads = [ Thread(target=mycounter, args=(data,)) for _ in range(thread_num) ] # スレッドの開始 for thread in threads: thread.start() # スレッドの終了まで待機 for thread in threads: thread.join() logging.debug(data) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (mycounter): start Thread-2 (mycounter): start Thread-3 (mycounter): start Thread-4 (mycounter): start Thread-5 (mycounter): start Thread-1 (mycounter): end {'x': 1} Thread-2 (mycounter): end {'x': 1} Thread-4 (mycounter): end {'x': 1} Thread-3 (mycounter): end {'x': 1} Thread-5 (mycounter): end {'x': 1} MainThread: {'x': 1} MainThread: end
上記例では、"x"
というキーで値が0
という要素を持っているdata
という辞書を用意しています。スレッドを5つ用意し、mycounter
関数へdata
を渡すことで処理の実行カウントを取ろうとしています。結果としては5
になってもらいたいと想定しましたが、結果として{'x': 1}
という形になってしまっています。
この場合、5つのスレッドは「thread.start()
」でほぼ同じタイミングで開始しているわけですが、その際にローカル変数のtmp
にd["x"]
の値を退避してしまっています。このタイミングでは、5つのスレッドではいずれもtmp=0
です。その後、5秒の処理後にtmp
に1
を足した値を"x"
のキーの値として更新しているため、すべてのスレッドで1
になってしまっています。
これは複数スレッドが同じ辞書に同時にアクセスしてしまっているために発生しています。このような状況を競合状態(race condition)と言います。マルチスレッド処理ではこのような状況にならないように、うまく排他制御をする必要があります。
threading
モジュールでは、排他制御として実行中は他スレッドが入ってこれないようにする仕組みが用意されています。以下でthreading
モジュールで提供されているロック(Lock
、RLock
)とセマフォ(Semaphore
)について紹介していきます。
ロックの取得 Lock
上記例を用いつつ、threading
モジュールのLock
を使ってロックを取得する方法を紹介します。Lock
は以下の例のように使用します。
import logging import time from threading import Lock, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def mycounter(d: dict, lock: Lock): # ロックを取得 with lock: logging.debug("start") tmp = d["x"] time.sleep(1) d["x"] = tmp + 1 logging.debug(f"end {d}") # # with句を使わない場合の以下と同じ # # ロックを取得 # lock.acquire() # # 以下のブロックがロックされる # tmp = d["x"] # time.sleep(1) # d["x"] = tmp + 1 # # ロックをリリース # lock.release() def main(): logging.debug("start") data = {"x": 0} # ロックを生成 lock = Lock() # スレッドの生成 thread_num = 5 threads = [ Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num) ] # スレッドの開始 for thread in threads: thread.start() # スレッドの終了まで待機 for thread in threads: thread.join() logging.debug(data) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (mycounter): start Thread-1 (mycounter): end {'x': 1} Thread-2 (mycounter): start Thread-2 (mycounter): end {'x': 2} Thread-3 (mycounter): start Thread-3 (mycounter): end {'x': 3} Thread-4 (mycounter): start Thread-4 (mycounter): end {'x': 4} Thread-5 (mycounter): start Thread-5 (mycounter): end {'x': 5} MainThread: {'x': 5} MainThread: end
上記実行してみると、スレッドが順にロックを取得し、ロックを保持しているスレッドが処理を行っている間、他のスレッドはロックの解放を待機することが分かります。
ロックを取得するためには、threading
モジュールのLock
クラスを使用します。main
関数の以下部分で、Lock
クラスのインスタンスを生成し、各スレッドに引数として渡しています。
...(省略)... data = {"x": 0} # ロックを生成 lock = Lock() # スレッドの生成 thread_num = 5 threads = [ Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num) ] ...(省略)...
具体的にロックを取得しているのは以下の部分で、Lock
のインスタンスでwith
句を使っています。このようにするとwith
句の中では、ロックが取得され、ブロック内の処理が終わるとロックが解放されます。
# ロックを取得 with lock: logging.debug("start") tmp = d["x"] time.sleep(1) d["x"] = tmp + 1 logging.debug(f"end {d}")
with
を使わないで以下のように書くことも可能です。acquire
メソッドでロックを取得し、release
メソッドでロックを解放します。
# ロックを取得 lock.acquire() # 以下のブロックがロックされる tmp = d["x"] time.sleep(1) d["x"] = tmp + 1 # ロックをリリース lock.release()
特に何か理由がない限りは自動でロックの解放までしてくれるwith
句を使うのが適切かと思いますが、明示的にロックの取得や解放ができることは覚えておいてください。
以上のようにLock
を使うことで、複数スレッドを排他制御しながら処理を進めることができます。
入れ子のロック取得 RLock
Lock
を使うことでロックを取得することができました。しかし、同じスレッド内でLock
を複数回取得しようとすると、デッドロックが発生して処理が止まってしまいます。この問題を解決するために、入れ子上にロックを取得したい場合はRLock
を使用します。
threading
モジュールのRLock
クラスを使って、入れ子上のロックを取得する方法を以下に示します。
import logging import time from threading import RLock, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def mycounter(d: dict, lock: RLock): # ロックを取得 with lock: tmp = d["x"] time.sleep(1) d["x"] = tmp + 1 # ロック内で再度ロックを取得する with lock: tmp = d["x"] time.sleep(1) d["x"] = tmp + 1 def main(): logging.debug("start") data = {"x": 0} # ロックを生成 lock = RLock() # スレッドの生成 thread_num = 5 threads = [ Thread(target=mycounter, args=(data, lock)) for _ in range(thread_num) ] # スレッドの開始 for thread in threads: thread.start() # 終了を待機 for thread in threads: thread.join() logging.debug(data) logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (mycounter): start Thread-1 (mycounter): end {'x': 2} Thread-2 (mycounter): start Thread-2 (mycounter): end {'x': 4} Thread-3 (mycounter): start Thread-3 (mycounter): end {'x': 6} Thread-4 (mycounter): start Thread-4 (mycounter): end {'x': 8} Thread-5 (mycounter): start Thread-5 (mycounter): end {'x': 10} MainThread: {'x': 10} MainThread: end
RLock
の使用方法はLock
とほとんど同じですが、同一スレッド内で複数回ロックを取得してもデッドロックが発生しない点が異なります。
このような入れ子状のロックでは、RLock
が連続してロックを取得し、リリースを行ってくれるため、処理が継続できます。この特性はLock
にはありませんので、注意が必要です。
セマフォ Semaphore
ロックの他に排他制御の仕組みとしてセマフォがあります。セマフォは、特定のリソース上で同時に動作できるプロセスやスレッドの数を制御するもので、セマフォにより同時実行数を制限できます。
Pythonのthreading
モジュールでは、このセマフォ機能をSemaphore
クラスを使って実現できます。以下の例では、仮想的にデータベースへのアクセス数をセマフォを用いて一定数に制限する例を示しています。
import logging import sqlite3 from threading import Semaphore, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def dbaccess(dbname: str, sema: Semaphore): # セマフォを使用 with sema: logging.debug("start") # DB接続 conn = sqlite3.connect(dbname) curs = conn.cursor() # DBを検索 select_all_sql = "SELECT * FROM person" curs.execute(select_all_sql) rows = curs.fetchall() # DBをクローズ curs.close() conn.close() logging.debug(f"end: {rows}") def main(): logging.debug("start") # データベース名 dbname = "test.db" # 最大コネクション数 max_connections = 3 # セマフォを生成 sema = Semaphore(max_connections) # スレッドの生成 thread_num = 5 threads = [ Thread(target=dbaccess, args=(dbname, sema)) for _ in range(thread_num) ] # スレッドの開始 for thread in threads: thread.start() # 終了を待機 for thread in threads: thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (dbaccess): start Thread-2 (dbaccess): start Thread-3 (dbaccess): start Thread-2 (dbaccess): end: [(1, 'TARO', 30)] Thread-4 (dbaccess): start Thread-3 (dbaccess): end: [(1, 'TARO', 30)] Thread-5 (dbaccess): start Thread-1 (dbaccess): end: [(1, 'TARO', 30)] Thread-4 (dbaccess): end: [(1, 'TARO', 30)] Thread-5 (dbaccess): end: [(1, 'TARO', 30)] MainThread: end
上記の例では、test.db
という仮想的なsqliteのDBを用意して、Semaphore
を使って最大コネクション数を3に制御し、それぞれのスレッドでSELECTを実行しています。
生成しているスレッド数は5ですが、実行を開始するとまずThread-1, Thread-2, Thread-3の3つのスレッドが開始します。その後、Thread-2が終了するとThread-4が開始し、Thread-3が終了するとThread-5が開始します。このように、同時に起動しているスレッド数は3に制限されています。
Semaphore
の使用方法は、外見上はLock
やRLock
の使い方と似ていますが、動作としては異なります。Semaphore
は同時実行数を制御するのに対し、Lock
やRLock
は排他制御を行います。特に、Semaphore
の生成時に同時実行の最大数を指定する点は、その特性を示す重要な部分です。
# 最大コネクション数 max_connections = 3 # セマフォを生成 sema = Semaphore(max_connections)
セマフォは、同時実行数に制限が必要な場面、特にリソースに容量の制限がある場面で使用されることが多いです。
有限セマフォ BoundedSemaphore
通常のSemaphore
では、acquire
とrelease
を使ってセマフォの数を操作できます。aquire
が実行されるとセマフォの数が減少し、release
が実行されるとセマフォの数が増加することで同時実行数を制御しています。
通常のSemaphoreは、セマフォ値が0を下回ることができない一方で、初期値を超えることも許容されています。つまり、release
を複数回実行すると、セマフォの値が初期値を超えることができます。
from threading import Semaphore # 最大数(初期値) max_sema = 2 # セマフォを生成 sema = Semaphore(max_sema) # 初期値 value=2 print(sema) # value=1 sema.acquire() print(sema) # value=0 sema.acquire() print(sema) # value=1 sema.release() print(sema) # value=2 sema.release() print(sema) # value=3 ← 初期値の最大を超える sema.release() print(sema)
【実行結果】 <threading.Semaphore at 0x1e4062b5790: value=2> <threading.Semaphore at 0x1e4062b5790: value=1> <threading.Semaphore at 0x1e4062b5790: value=0> <threading.Semaphore at 0x1e4062b5790: value=1> <threading.Semaphore at 0x1e4062b5790: value=2> <threading.Semaphore at 0x1e4062b5790: value=3>
有限な資源を使う場合や、初期値を超えてセマフォをリリースしないように制御したい場合は、BoundedSemaphore
を使用すると良いでしょう。BoundedSemaphore
は、セマフォの数が初期値を超過するとValueError
を送出します。
from threading import BoundedSemaphore # 最大数(初期値) max_sema = 2 # セマフォを生成 sema = BoundedSemaphore(max_sema) # 初期値 value=2 print(sema) # value=1 sema.acquire() print(sema) # value=0 sema.acquire() print(sema) # value=1 sema.release() print(sema) # value=2 sema.release() print(sema) # value=3 ← 初期値の最大を超える sema.release() print(sema)
【実行結果】 <threading.BoundedSemaphore at 0x28136f56350: value=2/2> <threading.BoundedSemaphore at 0x28136f56350: value=1/2> <threading.BoundedSemaphore at 0x28136f56350: value=0/2> <threading.BoundedSemaphore at 0x28136f56350: value=1/2> <threading.BoundedSemaphore at 0x28136f56350: value=2/2> Traceback (most recent call last): ...(省略)... ValueError: Semaphore released too many times
BoundedSemaphore
の使い方は、通常のSemaphore
と同じなので先ほどのSemaphore
の例は以下のように書き換えることができます。
...(省略)... def dbaccess(dbname: str, sema: BoundedSemaphore): # 有限セマフォを使用 with sema: logging.debug("start") ...(省略)... logging.debug(f"end: {rows}") def main(): ...(省略)... # 有限セマフォを生成 sema = BoundedSemaphore(max_connections) ...(省略)...
上記の例では、with
句を使っていますので、意図的にセマフォ値の超過を起こすことは考えにくいですが、セマフォの値の上限を確実に制限しながらセマフォを使いたい場合は、BoundedSemaphore
を選択すると良いでしょう。
例で使っているsqliteのデータベースは以下のプログラムで簡単に作成できます。sqlite自体の使い方は「SQLiteの基本的な使い方」にまとめていますので興味があれば参考にしてください。
import sqlite3 # データベースに接続する conn = sqlite3.connect("test.db") # カーソルを作成する curs = conn.cursor() print("===== CREATE TABLE =====") # テーブルを作成する curs.execute( "CREATE TABLE IF NOT EXISTS person (" "id INTEGER PRIMARY KEY AUTOINCREMENT" ",name VARCHAR" ",age INTEGER)" ) insert_sql = "INSERT INTO person(name, age) VALUES(?, ?)" curs.execute(insert_sql, ("TARO", 30)) # コミットする conn.commit() # カーソルとコネクションをクローズする curs.close() conn.close()
複数スレッドの制御
複数スレッドを制御するための方法としてキュー(queue.Queue
)、イベント(Event
)、コンディション(Condition
)、バリア(Barrier
)という方法について紹介します。
キューを用いたスレッド制御 queue.Queue
複数スレッドを用いてデータに対して並列処理したい場合、固定数のスレッドプールを用意し、先入れ先出し(First In First Out: FIFO)のキューを使って順次データを取り出して処理するのが一般的な方法です。
キューを使った制御を行う際にはqueue
モジュールのQueue
を使用できます。スレッドプールとキューを使った制御の例を以下で見ていきましょう。
import logging import time from queue import Empty, Queue from threading import Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") # スレッドプールのサイズ THREAD_POOL_SIZE = 3 def myworker(work_queue): logging.debug("start") # キューが空でない限り繰り返す while not work_queue.empty(): try: item = work_queue.get_nowait() except Empty: break else: time.sleep(1) logging.debug(item) # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない work_queue.task_done() logging.debug("end") def main(): logging.debug("start") # スレッドで共有するキューを用意する work_queue = Queue() # 表示したい値のリスト vals = [i for i in range(10)] # キューに投入する for val in vals: work_queue.put(val) # スレッドプール数のスレッドを用意 threads = [ Thread(target=myworker, args=(work_queue,)) for _ in range(THREAD_POOL_SIZE) ] # スレッドの開始 for thread in threads: thread.start() # キューが空になるまで待機 work_queue.join() # 終了を待機 for thread in threads: thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 Thread-1 (myworker): start Thread-2 (myworker): start Thread-3 (myworker): start Thread-1 (myworker): 0 Thread-2 (myworker): 1 Thread-3 (myworker): 2 Thread-1 (myworker): 3 Thread-3 (myworker): 5 Thread-2 (myworker): 4 Thread-1 (myworker): 6 Thread-2 (myworker): 8 Thread-2 (myworker): end Thread-3 (myworker): 7 Thread-3 (myworker): end Thread-1 (myworker): 9 Thread-1 (myworker): end
上記の例では、0~9までの数値リストを用意し、3つのスレッドに分けて表示しています。処理対象の数値データは以下の部分でキューを用意して投入しています。
from queue import Empty, Queue ...(省略)... # スレッドで共有するキューを用意する work_queue = Queue() # 表示したい値のリスト vals = [i for i in range(10)] # キューに投入する for val in vals: work_queue.put(val) ...(省略)...
キューを使用する際には、queue
モジュールからQueue
をインポートして生成します。キューに値を挿入するには、put
メソッドで追加します。
スレッドプールのスレッドを用意している部分を抜粋したのが以下の部分になります。
# スレッドプールのサイズ THREAD_POOL_SIZE = 3 ...(省略)... # スレッドプール数のスレッドを用意 threads = [ Thread(target=myworker, args=(work_queue,)) for _ in range(THREAD_POOL_SIZE) ] # スレッドの開始 for thread in threads: thread.start() # キューが空になるまで待機 work_queue.join() # 終了を待機 for thread in threads: thread.join() ...(省略)...
スレッドプールサイズの数だけスレッドを生成し、引数として作成したデータのキューを渡します。スレッドの開始については今までの通りです。キューについては空になるまで待機するjoin
メソッドを「work_queue.join()
」のように呼び出しています。
では、各スレッド動作でキューを使う部分を見ていきましょう。
from queue import Empty, Queue ...(省略)... def myworker(work_queue: Queue): logging.debug("start") # キューが空でない限り繰り返す while not work_queue.empty(): try: item = work_queue.get_nowait() except Empty: break else: time.sleep(1) logging.debug(item) # タスク終了 ↓を書かないとwork_queue.join()で終了を区別できない work_queue.task_done() logging.debug("end") ...(省略)..
各スレッドでは、キューが空でない限り繰り返すwhile
ループを使用しています。キューのempty
メソッドで「work_queue.empty()
」によりキューが空かどうかを判定します。ただし、キューの状態が他のスレッドにより変更される可能性があります。そのため、この判定のみに依存するのは避けるべきです。
キューが空でない場合はget_nowait
メソッドで先頭の要素を取り出します。このメソッドは、キューが空の場合には直ちにEmpty
例外を送出します。そのため、例外処理を用いてキューからの取得が失敗した場合の処理を記述しています。
このコード内では「work_queue.task_done()
」の部分が重要です。task_done
メソッドはタスクの終了を知らせるもので、呼び出すとキューの内部の未完了数をマイナスします。「work_queue.join()
」は、未完了数が0になると後続処理に進みます。そのため、この部分を書き忘れるとwork_queue.join()
以降の処理に進まなくなってしまいますので注意してください。
上記のようにキューをうまく使うことで複数スレッドを用いたデータの並列処理を実現できます。
イベント Event
あるスレッドの結果を待ってから、別スレッドを開始させたいような場合には、threading
モジュールのEvent
を使用できます。以下の例で見てみましょう。
import logging import time from threading import Event, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(event: Event): # event.set()を待機 event.wait() logging.debug("start") time.sleep(5) logging.debug("end") def event_trigger(event: Event): logging.debug("start") time.sleep(5) logging.debug("end") # wait状態のスレッドを起動 event.set() def main(): logging.debug("start") # イベントを生成 event = Event() # スレッドを生成 trigger_thread = Thread(target=event_trigger, args=(event,)) thread_num = 3 threads = [ Thread(target=myworker, args=(event,)) for _ in range(thread_num) ] # イベント後に起動するスレッドを開始 for thread in threads: thread.start() # イベントのトリガーとなるスレッドを開始 trigger_thread.start() # スレッドの終了を待機 trigger_thread.join() for thread in threads: thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (event_trigger): start Thread-1 (event_trigger): end Thread-2 (myworker): start Thread-4 (myworker): start Thread-3 (myworker): start Thread-3 (myworker): end Thread-4 (myworker): end Thread-2 (myworker): end MainThread: end
上記例では、event_trigger
関数が動くスレッドが完了すると、3つのmyworker
関数のスレッドが同時に開始されることが分かります。
Event
は、以下のように生成して関連するスレッドに引数として渡します。
...(省略)... # イベントを生成 event = Event() # スレッドを生成 trigger_thread = Thread(target=event_trigger, args=(event,)) thread_num = 3 threads = [ Thread(target=myworker, args=(event,)) for _ in range(thread_num) ] ...(省略)...
後続で動作するスレッドのmyworker
関数では、イベントが設定されるまでwait
メソッドで待機します。
...(省略)... def myworker(event: Event): # event.set()を待機 event.wait() logging.debug("start") time.sleep(5) logging.debug("end") ...(省略)...
トリガーとなるスレッドのevent_trigger
関数では、処理が終わったタイミングでset
メソッドを呼び出します。これにより、wait
メソッドで待機していたスレッドが同時に起動します。
...(省略)... def event_trigger(event: Event): logging.debug("start") time.sleep(5) logging.debug("end") # wait状態のスレッドを起動 event.set() ...(省略)...
さらに、Event
には、clear
メソッドがあり、これを使ってイベントのフラグをリセットすることが可能です。これは、特定の状態で再びスレッドを待機させたい場合などに利用できます。
このようにEvent
を使用することで、特定の順序でスレッドの実行を制御することができます。特に、ある処理Aを完了させてから、その結果を利用して別のスレッドで処理Bを実行したい場合などにEvent
を使用すると非常に便利です。
コンディション Condition
Event
に似たものとしてthreading
モジュールにはCondition
というものがあります。Event
との主な違いは、Condition
は内部的なロックを持っており、一度に1つのスレッドのみがCondition
のブロック内を実行できるという点です。
以下は、先ほどのEvent
の例をCondition
で書いた例です。
import logging import time from threading import Condition, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def myworker(condition: Condition): with condition: # condition.notify_all()を待機 condition.wait() logging.debug("start") time.sleep(5) logging.debug("end") def condition_trigger(condition: Condition): with condition: logging.debug("start") time.sleep(5) logging.debug("end") # wait状態のスレッドを起動 condition.notify_all() def main(): logging.debug("start") # コンディションを生成 condition = Condition() # スレッドを生成 trigger_thread = Thread(target=condition_trigger, args=(condition,)) thread_num = 3 threads = [ Thread(target=myworker, args=(condition,)) for _ in range(thread_num) ] # トリガーとなるスレッド実行後に起動するスレッドを開始 for thread in threads: thread.start() # トリガーとなるスレッドを開始 trigger_thread.start() # スレッドの終了を待機 trigger_thread.join() for thread in threads: thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (condition_trigger): start Thread-1 (condition_trigger): end Thread-3 (myworker): start Thread-3 (myworker): end Thread-2 (myworker): start Thread-2 (myworker): end Thread-4 (myworker): start Thread-4 (myworker): end MainThread: end
先ほどのEvent
ではトリガーとなるイベントが完了した際に、後続のスレッドが同時に開始しましたが、Condition
を使った例では、待機していた後続スレッドが順次実行されます。これは、後続の各スレッドがロックを伴って動作するためです。
Condtion
は、Event
と使い方はほとんど同じですが、重要な違いがあります。Event
では、wait
メソッドとset
メソッドを使用しましたが、Condition
では、wait
メソッドとnotify_all
メソッドを使用します。さらに、Condition
オブジェクトは、with
句を使用する必要があります。これにより、内部のロックの取得と解放が自動的に行われます。
...(省略)... def myworker(condition: Condition): with condition: # condition.notify_all()を待機 condition.wait() logging.debug("start") time.sleep(5) logging.debug("end") def condition_trigger(condition: Condition): with condition: logging.debug("start") time.sleep(5) logging.debug("end") # wait状態のスレッドを起動 condition.notify_all() ...(省略)...
このようにスレッドの順序関係を制御しつつ、ロックを併用する場合では、Condition
を使用するとよいでしょう。
バリア Barrier
Event
やCondition
と似たような同期機構として、threading
モジュールにはBarrier
というものがあります。Barrier
は、複数のスレッドが特定の同期点に達するまで待機し、その後すべてのスレッドが同時に動作を再開するための機構です。
つまり、Barrier
で指定した数のスレッドがすべてwait
メソッドを呼び出すまで、いずれのスレッドも進行できなくなります。以下の例で見てみましょう。
import logging import time from threading import Barrier, Thread logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def server(barrier: Barrier): logging.debug("start") # バリア数スレッドが起動するまで待機 barrier.wait() while True: logging.debug("execution") time.sleep(5) def client(barrier: Barrier): time.sleep(5) logging.debug("start") # バリア数スレッドが起動するまで待機 barrier.wait() while True: logging.debug("execution") time.sleep(5) def main(): logging.debug("start") # バリアを生成 barrier = Barrier(2) # スレッドの生成 server_thread = Thread(target=server, args=(barrier,)) client_thread = Thread(target=client, args=(barrier,)) # スレッドを開始 server_thread.start() client_thread.start() # スレッドの終了を待機 server_thread.join() client_thread.join() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start Thread-1 (server): start Thread-2 (client): start Thread-2 (client): execution Thread-1 (server): execution Thread-1 (server): execution Thread-2 (client): execution ...(継続)...
上記例では、client
関数の起動はtime.sleep
で故意に遅らせています。server
関数が先に開始しますが、Barrier
で定義したスレッド数に達していないため、後続の処理はすぐには動きません。
client
側のスレッドが起動し、Barrier
で指定したスレッド数に達した時に、server
およびclient
の両方が動作を開始します。また、Barrier
は指定した数のスレッドがwait()
を呼び出した後、自動的にリセットされ、再度同じ数のスレッドがwait()
を呼び出すのを待機する状態となります。
なお、上記スレッドの動作は、起動した後は無限ループになっているため手動で終了する必要があります。この点に注意してください。
使い方に関して、Event
やCondition
との違いは、Barrier
の初期化時に動作を開始する条件となるスレッド数を指定する点です。
from threading import Barrier, Thread ...(省略)... # バリアを生成 barrier = Barrier(2) ...(省略)...
各スレッドでは、以下のようにwait
メソッドを呼び出します。
...(省略)... def server(barrier: Barrier): logging.debug("start") # バリア数スレッドが起動するまで待機 barrier.wait() while True: logging.debug("execution") time.sleep(5) def client(barrier: Barrier): time.sleep(5) logging.debug("start") # バリア数スレッドが起動するまで待機 barrier.wait() while True: logging.debug("execution") time.sleep(5) ...(省略)...
上記のように、複数のスレッドが同期して動作を開始する必要がある場合、例えば、サーバーとクライアントを協調して動作するようなケースではBarrier
を使用するとよいでしょう。
multiprocessing.dummyによるマルチスレッド
Pythonには、マルチプロセスを実現するmultiprocessing
モジュールがありますが、同じAPIを利用してマルチスレッドを実現したい場合には、multiprocessing.dummy
が便利です。これにより、コードの大部分を変更することなく、プロセスの代わりにスレッドを使用して並列処理を行うことができます。
上記でqueue.Queue
を利用して作成したスレッドプールと同じような動作をPool
クラスで実現できます。
import logging import time from multiprocessing.dummy import Pool as ThreadPool logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") # スレッドプールのサイズ THREAD_POOL_SIZE = 3 def myworker(x: int): logging.debug("start") time.sleep(1) logging.debug(f"end: {x}") def main(): logging.debug("start") # 表示したい値のリスト vals = [i for i in range(10)] # スレッドプールの使用 with ThreadPool(THREAD_POOL_SIZE) as pool: results = pool.map_async(myworker, vals) # map_asyncは非同期なので↓はすぐに表示 logging.debug("execute") # getを実行したら動作する logging.debug(results) results.get() logging.debug("end") if __name__ == "__main__": main()
【実行結果】 MainThread: start MainThread: execute MainThread: <multiprocessing.pool.MapResult object at 0x000002A00C6B1ED0> Thread-1 (worker): start Thread-2 (worker): start Thread-3 (worker): start Thread-1 (worker): end: 0 Thread-1 (worker): start Thread-3 (worker): end: 2 Thread-3 (worker): start Thread-2 (worker): end: 1 Thread-2 (worker): start Thread-1 (worker): end: 3 Thread-1 (worker): start Thread-3 (worker): end: 4 Thread-3 (worker): start Thread-2 (worker): end: 5 Thread-2 (worker): start Thread-1 (worker): end: 6 Thread-1 (worker): start Thread-3 (worker): end: 7 Thread-2 (worker): end: 8 Thread-1 (worker): end: 9 MainThread: end
上記では、Pool
クラスをThreadPool
としてインポートしています。ThreadPool
のwith
句で、map_async
メソッドに実行する関数と引数を渡すことでマルチスレッドで処理を実行できます。実行すると分かりますが、THREAD_POOL_SIZE
に指定した数までのスレッドしか生成されないことが分かります。
Pool
クラスは、multiprocessing
モジュールでも中心的なものです。「multiprocessingによるマルチプロセスの基本」でプロセスプールの説明として使い方を記載していますので、あわせて確認してもらえるとよいかと思います。
上記のように、multiprocessing
モジュールはマルチプロセスとマルチスレッドの両方で共通のAPIを持っています。マルチスレッドかマルチプロセスかは、どちらかが必ず優れていると言えるようなものではありません。解決する問題や要件に合わせて適切な手法を選択し、十分な検討を重ねることが大切です。
まとめ
Pythonでマルチスレッド処理を行うためのthreading
モジュールの使い方について解説しました。
threading
モジュールを使用すると、マルチスレッド処理を簡単に実現できます。このモジュールには、ロック(Lock
, RLock
)やセマフォ(Semaphore
, BoundedSemaphore
)などの排他制御機能、さらにスレッド間の制御を支援するEvent
, Condition
, Barrier
といった仕組みが備わっています。また、queue
モジュールのQueue
と組み合わせることで、キューを活用したデータの並列処理も行うことができます。
また、マルチプロセスを実現するmultiprocessing
モジュールの中に、スレッドを利用するためのmultiprocessing.dummy
が存在しているため、こちらについても紹介しました。
Pythonでは、グローバルインタープリタロック(GIL)の存在により、CPUバウンドな処理でのマルチスレッドの効果は限定的です。しかし、I/Oバウンドな処理でマルチスレッドは有効であり、その使い方を理解することは非常に価値があります。
並列処理は複雑ですが、マルチスレッド処理を実践的に使いこなせるようになることをおすすめします。
threadingモジュールの公式ドキュメントはこちらを参照してください。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。