Pythonで、asyncpg
を用いて非同期にPostgreSQLデータベースへアクセスする方法を解説します。
Contents
asyncpgによる非同期データベース処理 (PostgreSQL)
非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5でもasync
やawait
といった非同期プログラミング方法が導入されています。
async
/await
を用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」でまとめているので参考にしてください。
本記事では、非同期処理での効果が期待できるデータベースアクセスに焦点を当てます。具体的には、asyncpg
を用いて非同期にPostgreSQLデータベースへアクセスする方法を紹介します。
asyncpgとは
asyncpg
とは、PostgreSQLデータベースの非同期処理をサポートするPythonのサードパーティ製ライブラリです。公式ドキュメントはこちらを参照してください。
非同期プログラミングは、I/Oバウンドな処理を実装するためには非常に有益な実装方法です。I/Oバウンドな処理とは、ファイルへの入出力やネットワーク通信等が該当します。
PostgreSQLにアクセスする際には同期的なパッケージとしてpsycopg2
がよく使用されます。psycopg2
の使い方については「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」でまとめているので参考にしてください。
PostgreSQL用の非同期ライブラリとしては他にもaiopg
というパッケージもあります。aiopg
はpsycopg2
をベースにしているため、psycopg2
を利用している人には適しているかもしれません。一方で、asyncpg
は高速であることで知られています。今回はasyncpg
の使い方について説明していきたいと思います。
asyncpgを用いた非同期通信の基本
この記事では「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」で紹介したPostgreSQLデータベースへのアクセスクラスの非同期版クラスを作成してみたいと思います。
asyncpgのインストール
asyncpg
はサードパーティ製ライブラリのため、インストールが必要です。以下のようにpipでインストールをしてください。
pip install asyncpg
インストールが完了したら非同期にPostgreSQLを扱うことが可能です。
サンプルプログラムの前提条件
本記事では、PostgreSQLのインストール自体については説明を省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。
- PostgreSQLがローカルPCにインストールされているものとする。
- DB「test」、スキーマ「work」が作成されているものとする。
- ユーザー「test」が作成されており、パスワードは「PAssw0rd」とする。また、workスキーマに対して適切に権限が設定されているものとする。(
GRANT ALL ON SCHEMA work TO test;
)
以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正して動作させることも可能です。
今回は、asyncpg
の使用方法に焦点を当てるため細かな例外処理を省略していますのでご注意ください。実際のアプリケーションの利用では、例外処理を十分に考慮する必要があります。
asyncpgを用いたPostgreSQLアクセスクラス
asyncpg
を用いてPostgreSQLデータベースへアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明していきます。
【dbconfig.ini】DBアクセス設定ファイル
[POSTGRESQL_DB_SERVER] host = localhost port = 5432 dbname = test user = test password = PAssw0rd [POOL] min_size=1 max_size=10
【db_connect_asyncpg.py】非同期DBアクセス用クラス
import configparser import asyncpg class DbConnectAsyncPostgres: # 接続プール connection_pool = None @classmethod async def initialize_connection_pool( cls, config_file="./dbconfig.ini" ) -> None: """接続プールの初期化 Args: config_file: 設定ファイルパス Returns: None """ if cls.connection_pool is None: # コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read(config_file) # 接続情報の取得 host = config_db["POSTGRESQL_DB_SERVER"]["host"] port = config_db["POSTGRESQL_DB_SERVER"]["port"] dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"] user = config_db["POSTGRESQL_DB_SERVER"]["user"] password = config_db["POSTGRESQL_DB_SERVER"]["password"] # プール設定の取得 min_size = int(config_db["POOL"]["min_size"]) max_size = int(config_db["POOL"]["max_size"]) # DSN(Data Source Name)の作成 dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}" # 非同期接続プールの初期化 cls.connection_pool = await asyncpg.create_pool( dsn=dsn, min_size=min_size, max_size=max_size, ) @classmethod async def close_connection_pool(cls): """接続プールをクローズする""" if cls.connection_pool: await cls.connection_pool.close() cls.connection_pool = None def __init__(self) -> None: """コンストラクタ""" self.conn = None self.transaction = None async def connect(self) -> None: """DB接続""" if self.connection_pool is None: raise Exception("接続プールが初期化されていません") # 接続プールから非同期でコネクションを取得 self.conn = await self.connection_pool.acquire() async def close(self) -> None: """DBクローズ""" if self.conn: # 接続プールに返却する await self.connection_pool.release(self.conn) self.conn = None async def __aenter__(self): # DB接続 await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): # DBクローズ await self.close() async def execute_non_query( self, sql: str, bind_var: tuple = None ) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド Args: sql: 実行SQL bind_var: バインド変数 Returns: None """ # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す) await self.conn.execute(sql, *(bind_var or ())) async def execute_query( self, sql: str, bind_var: tuple = None, count: int = 0 ) -> list[asyncpg.Record]: """SELECTのSQL実行メソッド Args: sql: 実行SQL bind_var: バインド変数 count: データ取得件数 (0は全ての行を取得) Returns: 結果リスト (asyncpg.Record型) """ stmt = await self.conn.prepare(sql) # データ取得件数が指定されている場合 if count > 0: rows = await stmt.fetch(count, *(bind_var or ())) else: rows = await stmt.fetch(*(bind_var or ())) return rows async def start_transaction(self) -> None: """トランザクションの開始""" # コネクションでトランザクションを開始する self.transaction = self.conn.transaction() # トランザクションを開始する await self.transaction.start() async def commit_transaction(self) -> None: """トランザクションのコミット""" await self.transaction.commit() async def rollback_transaction(self) -> None: """トランザクションのロールバック""" await self.transaction.rollback()
サンプルコードの詳細説明
DB接続設定ファイル dbconfig.ini
データベース(DB)へアクセスする際には、接続のための設定情報が必要です。DBの設定情報は、DBアクセス設定ファイルとしてdbconfig.ini
として用意します。
このファイルは、以降で紹介するプログラム内でconfigparser
モジュールを用いて内容を取得します。configparser
の使用方法については「configparserによる設定ファイル管理」にまとめていますので参考にしてください。
[POSTGRESQL_DB_SERVER] host = localhost port = 5432 dbname = test user = test password = PAssw0rd [POOL] min_size=1 max_size=10
設定ファイルには以下の情報を記載しています。
設定値 | 概要 |
---|---|
host | 具体的なサーバーのホスト名やIPアドレスを記載します。今回はローカルPCを使うためlocalhost としています。 |
port | PostgreSQLへアクセスするためのポートです。PostgreSQLではデフォルトで5432 が使用されます。お使いの環境で変更している場合は修正が必要です。 |
dbname | アクセスするデータベース名を記載します。 |
user | アクセスするユーザーを記載します。 |
password | アクセス時のパスワードを記載します。 |
以下の項目については、データベースへの接続プールに関する設定項目です。
設定値 | 概要 |
---|---|
min_size | 接続プールにおける最小コネクション数を表します。 |
max_size | 接続プールにおける最大コネクション数を表します。 |
今回は簡単のためパスワードは平文のまま記載していますが、セキュリティ上適切ではありません。実際には暗号化等を考慮する必要がある点に注意してください。
非同期DBアクセス用クラス DbConnectAsyncPostgres
DBアクセス用のクラスとしてDbConnectAsyncPostgres
というクラスを定義しています。このクラスをDBを使用するプログラムから呼び出すことで非同期にPostgreSQLへアクセスできます。また、このクラスは接続プールやコンテキストマネージャーに対応しています。
定義メソッドの概要は以下になります。
メソッド名 | 概要 |
---|---|
initialize_connection_pool | 接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。 このメソッドは classmethod として定義し、アプリケーション起動時に呼び出します。 |
close_connection_pool | 接続プールをクローズするメソッドです。 このメソッドは classmethod として定義し、アプリケーション終了時に呼び出します。 |
__init__ | DbConnectAsyncPostgres クラスのコンストラクタです。 |
connect | 接続プールから非同期にコネクションを取得します。 |
close | データベースのクローズ処理を行います。コネクションは、接続プールへ返却します。 |
__aenter__ | with 句を実行する際に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。connect メソッドを呼び出すことでDB接続を行い、戻り値をas で指定された変数に反映します。 |
__aexit__ | with 句を抜ける時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。例外が発生した場合には、exc_type 、exc_val 、exc_tb に例外情報が渡されます。例外発生時はロールバックし、例外がなければコミットを行ったうえで、DB接続をクローズします。 |
execute_non_query | CREATE /INSERT /UPDATE /DELETE 処理用のSQL実行メソッドです。SQL文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query | SELECT 処理用のSQL実行メソッドです。返却値としては辞書のリストを返します。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。 |
start_transaction | トランザクションを開始するメソッドです。 |
commit_transaction | コミットを行うメソッドです。 |
rollback_transaction | ロールバックを行うメソッドです。 |
それぞれのメソッドのポイントについて以降で説明していきます。なお、非同期に実行される関数については「async def
」で定義されていることに注意してください。また、各関数内で呼び出す際にも非同期に呼び出す場合には「await
」キーワードを指定することが重要です。
接続プールの初期化:initialize_connection_pool
initialize_connection_pool
は、接続プールの初期化を行います。初期化の際には、設定ファイルからDBの接続情報と接続プールの設定値を取得します。
# 接続プール connection_pool = None @classmethod async def initialize_connection_pool( cls, config_file="./dbconfig.ini" ) -> None: """接続プールの初期化 Args: config_file: 設定ファイルパス Returns: None """ if cls.connection_pool is None: # コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read(config_file) # 接続情報の取得 host = config_db["POSTGRESQL_DB_SERVER"]["host"] port = config_db["POSTGRESQL_DB_SERVER"]["port"] dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"] user = config_db["POSTGRESQL_DB_SERVER"]["user"] password = config_db["POSTGRESQL_DB_SERVER"]["password"] # プール設定の取得 min_size = int(config_db["POOL"]["min_size"]) max_size = int(config_db["POOL"]["max_size"]) # DSN(Data Source Name)の作成 dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}" # 非同期接続プールの初期化 cls.connection_pool = await asyncpg.create_pool( dsn=dsn, min_size=min_size, max_size=max_size, )
initialize_connection_pool
は、@classmethod
でクラスメソッドとして定義し、アプリケーション起動時に1度呼び出します。コンストラクタ(__init__
)で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。
接続プールを作成するには、asyncpg.create_pool
を使用します。接続時にはDSN(Data Source Name)を指定しますが、postgresql://{user}:{password}@{host}:{port}/{dbname}
という形式をとります。今回の場合、具体的には「postgresql://test:PAssw0rd@localhost:5432/test
」ということになります。
また、min_size
は最小接続数、max_size
は最大接続数を表しており、今回の設定ファイルでは、min_size=1
、max_size=10
としているので、最大10のコネクションを使用することが可能です。
最大接続数を超えた場合の挙動
asyncpg
の接続プールでは、最大コネクションに達した場合に他のコネクションを使用しようとした場合には、コネクションが解放されるまで待ちます。psycopg2
の接続プールでは、最大を超えるコネクションを接続をしようとするとエラーとなるので挙動に違いがあります。
asyncpg
は、可能な限り効率的にリソースを利用しようとしますが、psycopg2
ではそのような状況を許容しません。これは非同期処理を中心に設計されたasyncpg
と、より伝統的な同期処理を主とするpsycopg2
の設計思想の違いがあるためです。
接続プールのクローズ:close_connection_pool
接続プールについては、アプリケーションで不要になったときにclose
を使用してクローズします。
@classmethod async def close_connection_pool(cls): """接続プールをクローズする""" if cls.connection_pool: await cls.connection_pool.close() cls.connection_pool = None
接続プールをクローズする際にはclose
メソッドを使用して、すべての接続をクローズします。connection_pool
にNone
を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNone
を設定しています。
コンストラクタ:__init__
コンストラクタ(__init__
)では、DBへの接続インスタンスごとにコネクションとトランザクション用の変数を用意しておきます。具体的なコネクション取得は、以降のconnect
メソッドで行います。
def __init__(self) -> None: """コンストラクタ""" self.conn = None self.transaction = None
DB接続:connect
connect
メソッドでは、接続プールからコネクションを取得します。
async def connect(self) -> None: """DB接続""" if self.connection_pool is None: raise Exception("接続プールが初期化されていません") # 接続プールから非同期でコネクションを取得 self.conn = await self.connection_pool.acquire()
コネクションを取得する際には、接続プールから「self.connection_pool.acquire()
」というようにacquire
メソッドを使用してコネクションを取得します。
DBクローズ:close
close
メソッドでは、データベースのクローズ処理を行います。
async def close(self) -> None: """DBクローズ""" if self.conn: # 接続プールに返却する await self.connection_pool.release(self.conn) self.conn = None
コネクションについては、接続プールに返却するためにrelease
メソッドにコネクションを渡します。conn
にNone
を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNone
を設定しています。
コンテキストマネージャ:__aenter__
及び __aexit__
DbConnectAsyncPostgres
クラスは、with
句を使用できるようにコンテキストマネージャーに対応して実装します。コンテキストマネージャーに対応するためには、具体的には__aenter__
メソッドと__aexit__
メソッドの実装が必要です。
コンテキストマネージャーの考え方については「コンテキストマネージャーの基本」でまとめているので参考にしてください。ただし、コンテキストマネージャに対応する関数は、__aenter__
や__aexit__
というように「a
」がついた非同期用のメソッドになっている違いがあることに注意してください。
async def __aenter__(self): # DB接続 await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): # DBクローズ await self.close()
__aenter__
メソッドでは、DB接続を行います。これによりwith
句に入った際にDB接続を行い、as
で指定された変数にDBクラスのインスタンスを設定します。
with
句を抜ける際には、__aexit__
メソッドが呼び出されます。この際にclose
メソッドでデータベースのクローズ処理を行います。
asyncpg
では、自動コミットモードを採用しています。そのため、SQLを実行したタイミングで自動でコミットされます。これは1つのSQLをトランザクションと考えていることに相当します。
もし、複数のSQLで1つのトランザクションととらえたい場合には、以降で説明するstart_transaction
やcommit_transaction
、rollback_transaction
を明示的に使用する必要があります。
なお、同期的なパッケージであるpsycopg2
では、自動コミットモードはデフォルトでOFFになっているため、明示的にコミットが必要です。この点で各パッケージには考え方が違うことを覚えておいてください。
SQL実行:execute_non_query
SQL実行用のメソッドとしてexecute_non_query
を用意しています。後述するexecute_query
との違いは返却値がないことで、CREATE
/INSERT
/UPDATE
/DELETE
などのSQLを実行するために使用します。
async def execute_non_query( self, sql: str, bind_var: tuple = None ) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド Args: sql: 実行SQL bind_var: バインド変数 Returns: None """ # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す) await self.conn.execute(sql, *(bind_var or ()))
SQLを実行するためにはコネクションのexecute
メソッドに対象のSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。
バインド変数は、アンパックして渡す必要がありますがor
を使用して簡潔に記載しています。この記載方法では、bind_var
がNone
の場合は()
が渡されます。
SQL実行:execute_query
SQL実行用のメソッドとしてもう一つexecute_query
を用意しています。上記のexecute_non_query
との違いは、返却値があることです。SELECT
してデータを取得するSQLを実行するために使用します。
async def execute_query( self, sql: str, bind_var: tuple = None, count: int = 0 ) -> list[asyncpg.Record]: """SELECTのSQL実行メソッド Args: sql: 実行SQL bind_var: バインド変数 count: データ取得件数 (0は全ての行を取得) Returns: 結果リスト (asyncpg.Record型) """ stmt = await self.conn.prepare(sql) # データ取得件数が指定されている場合 if count > 0: rows = await stmt.fetch(count, *(bind_var or ())) else: rows = await stmt.fetch(*(bind_var or ())) return rows
SQLを実行するためには、コネクションのfetch
メソッドに対象となるSQLを渡しますが、その前にprepare
メソッドでSQLをstmt
として準備します。
SELECT
結果を全て取得する場合は、stmt.fetch
にバインド変数のみ渡して呼び出します。count
が指定されていいる(>0
)の場合には、count
を引数に指定することでその件数を取得することができます。
なお、返却のrows
は、asyncpg.Record
型のリストとなります。必要に応じて結果をdict()
でPythonの辞書に変換することも可能です。
トランザクションの開始:start_transaction
asyncpg
では、自動コミットモードを採用しています。そのため、SQLを実行したタイミングで自動でコミットされますが、複数のSQLで1つのトランザクションととらえたい場合があります。このような場合には、以下のstart_transaction
を使用します。
async def start_transaction(self) -> None: """トランザクションの開始""" # コネクションでトランザクションを開始する self.transaction = self.conn.transaction() # トランザクションを開始する await self.transaction.start()
コネクションからトランザクションを開始するには、コネクションのtransaction
メソッドにより接続からトランザクションを用意し、start
メソッドでトランザクションを開始します。
コミット:commit_transaction
start_transaction
によりトランザクションを開始した場合には、適切な位置でコミットを明示的に行う必要があります。この場合には、commit_transaction
を使用します。
async def commit_transaction(self) -> None: """トランザクションのコミット""" await self.transaction.commit()
内部的には、トランザクションのcommit
メソッドを呼び出しているだけです。もし、コミット時に他の処理が必要な場合は追加してください。
ロールバック:rollback_transaction
コミット前のデータベースの変更をトランザクション開始時点まで戻したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。この場合には、rollback_transaction
を使用します。
async def rollback_transaction(self) -> None: """トランザクションのロールバック""" await self.transaction.rollback()
内部的には、トランザクションのrollback
メソッドを呼び出しているだけです。もし、ロールバック時に他の処理が必要な場合は追加してください。
DBアクセスクラスの使い方
上記で定義したDbConnectAsyncPostgres
クラスは、他のプログラムから使用することが可能です。以降ではは、DbConnectPostgres
クラスを使用して、データの追加(INSERT
)、データ検索(SELECT
)の使い方例を紹介します。
基本的な使い方 (自動コミットモードを使用)
データをINSERTする
データを非同期に登録(INSERT
)したい場合には、以下のようにします。
import asyncio from db_connect_asyncpg import DbConnectAsyncPostgres async def insert_data(value1, value2): """データをインサートする (自動コミットモードを使用する場合)""" async with DbConnectAsyncPostgres() as db: insert_sql = ( "INSERT INTO work.sample_table " "(str1, value1, last_update_datetime) " "VALUES($1, $2, current_timestamp)" ) await db.execute_non_query(insert_sql, (value1, value2)) async def main(): # 接続プールを初期化する await DbConnectAsyncPostgres.initialize_connection_pool() # 非同期タスクを並行実行 tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)] await asyncio.gather(*tasks) # 接続プールをクローズする await DbConnectAsyncPostgres.close_connection_pool() if __name__ == "__main__": # asyncio.runを使用してメインコルーチンを実行 asyncio.run(main())
以降でポイントを説明していきます。
モジュールのインポート
DBアクセスクラスを使うには、作成したモジュールを以下のようにインポートしてください。
from db_connect_asyncpg import DbConnectAsyncPostgres
接続プールの初期化とクローズ
今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。
# 接続プールを初期化する await DbConnectAsyncPostgres.initialize_connection_pool() ...(省略)... # 接続プールをクローズする await DbConnectAsyncPostgres.close_connection_pool()
接続プールは、プールからコネクションを取得して使うという考え方であるため、アプリケーション起動時に1度作成し、アプリケーション終了時にクローズするようにしてください。
非同期INSERT関数
非同期なINSERT
用関数としてinsert_data
関数を定義しています。
async def insert_data(value1, value2): """データをインサートする (自動コミットモードを使用する場合)""" async with DbConnectAsyncPostgres() as db: insert_sql = ( "INSERT INTO work.sample_table " "(str1, value1, last_update_datetime) " "VALUES($1, $2, current_timestamp)" ) await db.execute_non_query(insert_sql, (value1, value2))
※[注意] 上記コードについて、使用しているプラグインの影響でうまく表示されず見た目上「VALUES(1,2, current_timestamp)
」の表示になってしまっているかと思いますが、正確には「VALUES($1, $2, current_timestamp)
」です。
今回作成したクラスは、コンテキストマネージャーを使用できるため、with
句を使ってインスタンスを作成することができます。
with
句に入った際に、connect
メソッドにより接続プールからコネクションが取得され、with
句を抜ける際にコネクションは返却されます。asyncpg
は、自動コミットモードとなっているため、execute_non_query
が実行されたタイミングで、すぐにコミットされます。
また、SQLインジェクションを防ぐ手段の一つであるプレースホルダーですが、asyncpg
では値を埋め込むためのSQLのプレースホルダーとして$1
、$2
といった表記を使用し、引数に渡すタプルの値が順に設定されます。psycopg2
では、%s
といった表記であり少し異なっていることに注意が必要です。
非同期タスクの作成と実行
メインコルーチン(main
)の中では、接続プールを生成した後にタスクを作成しています。
# 非同期タスクを並行実行 tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)] await asyncio.gather(*tasks)
タスクは、リスト内包表記を使って10個のタスクを用意しています。このリストを非同期に実行する際にasyncio.gather
を使用します。これにより、各タスクは非同期に実行されます。
トランザクションを明確に管理する使い方
asyncpg
は、自動コミットモードを採用しているため、SQLを実行したタイミングで自動的にコミットまたはロールバックが行われます。これは1つのSQLを1トランザクションととらえていることと同じです。
しかし、実際には複数のSQLを1トランザクションとして考えたい場合もあります。そのようなときには、以下のように明示的にトランザクションの開始やコミットを実行することができます。
import asyncio from db_connect_asyncpg import DbConnectAsyncPostgres async def insert_data_with_transaction(value1, value2): """データをインサートする (トランザクションを明示的に管理する場合)""" async with DbConnectAsyncPostgres() as db: try: # トランザクションの開始 await db.start_transaction() # インサートを実行 insert_sql = ( "INSERT INTO work.sample_table " "(str1, value1, last_update_datetime) " "VALUES($1, $2, current_timestamp)" ) await db.execute_non_query(insert_sql, (value1, value2)) # コミット await db.commit_transaction() except Exception as ex: await db.rollback_transaction() print(f"エラー発生: {ex}") async def main(): # 接続プールを初期化する await DbConnectAsyncPostgres.initialize_connection_pool() # 非同期タスクを並行実行 tasks = [ insert_data_with_transaction(f"test_str{i}", 10 * i) for i in range(10) ] await asyncio.gather(*tasks) # 接続プールをクローズする await DbConnectAsyncPostgres.close_connection_pool() if __name__ == "__main__": # asyncio.runを使用してメインコルーチンを実行 asyncio.run(main())
※[注意] 上記コードについて、使用しているプラグインの影響でうまく表示されず見た目上「VALUES(1,2, current_timestamp)
」の表示になってしまっているかと思いますが、正確には「VALUES($1, $2, current_timestamp)
」です。
基本的な使い方は先に説明したプログラムと同様ですが、async with
のブロックの中で明示的にstart_transaction
によりトランザクションを開始してます。
トランザクションを明示的に開始した場合には、execute_non_query
を実行したタイミングではコミットは行われません。そのため、明示的にcommit_transaction
でコミットを行う必要があります。また、try...except
を用いることで例外をキャッチした場合には、rollback_transaction
によりロールバックする構成となっています。
このように明示的にトランザクションの範囲を制御することで、複数のSQLの実行を1トランザクションとして使うことも可能になります。
今回は、INSERT
の例を紹介しましたが、UPDATE
やDELETE
についても同様の考え方で使用することが可能です。
データを検索する
今回作成したクラスでデータを検索(SELECT
)する例についても見てみましょう。複数のSELECT
を非同期に実行する場合には、以下のように実行します。
import asyncio from pprint import pprint from db_connect_asyncpg import DbConnectAsyncPostgres async def select_data(target_id): """データを検索する""" async with DbConnectAsyncPostgres() as db: select_sql = "SELECT * FROM work.sample_table WHERE id = $1" result = await db.execute_query(select_sql, (target_id,)) return result async def main(): # 接続プールを初期化する await DbConnectAsyncPostgres.initialize_connection_pool() # 非同期タスクを並行実行 tasks = [ select_data(1), select_data(2), select_data(3), ] results = await asyncio.gather(*tasks) pprint(results) # 接続プールをクローズする await DbConnectAsyncPostgres.close_connection_pool() if __name__ == "__main__": # asyncio.runを使用してメインコルーチンを実行 asyncio.run(main())
【実行結果】 [[<Record id=1 str1='test_str0' value1=0 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 665664)>], [<Record id=2 str1='test_str1' value1=10 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 840874)>], [<Record id=3 str1='test_str3' value1=30 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 966835)>]]
SELECT
のための関数として、select_data
という非同期関数を用意しています。これは特定のid
のデータを検索する関数です。この関数ではexecute_query
にSQLとバインド変数として受け取ったid
を渡してSELECT
を実行し、実行結果を返却します。
非同期タスクの実行についても、これまでと同様でタスクのリストを用意し、asyncio.gather
に渡します。今回はid=1
、id=2
、id=3
の場合のSELECT
を非同期に実行しています。asyncio.gather
は結果をまとめてくれます。
今回のSELECT
の例は非同期で実行する必要もないようなものかもしれませんが、非同期での検索の使い方を説明するためということで理解してください。より複雑で負荷の高いSELECTの場合、非同期で実行して最後に結果をまとめることで効率的に検索ができる可能性があります。
まとめ
Pythonで、asyncpg
を用いて非同期にPostgreSQLデータベースへアクセスする方法を解説しました。
asyncpg
とは、PostgreSQLデータベースとの非同期通信をサポートするPythonのサードパーティ製ライブラリです。asyncpg
は高速であることで知られており、今回はasyncpg
を使って非同期にPostgreSQLへアクセスするためのクラスを作成してみました。また、作成したクラスの使い方についても例を使って紹介しています。
非同期プログラミングは、I/Oバウンドな処理を実装するためには非常に有益な実装方法です。DBアクセスについても適切に使用することができれば、性能向上できる可能性が高くなります。ぜひ、asyncpg
について使い方を理解していただき、非同期処理をしてみてもらいたいと思います。