【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)

Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説します。
目次
asyncpg による非同期データベース処理 (PostgreSQL)
非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5 でも async や await といった非同期プログラミング方法が導入されています。
async / await を用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」でまとめているので参考にしてください。
この記事では、非同期処理での効果が期待できる DB アクセスに焦点を当てます。具体的には、asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を紹介します。
asyncpg とは
asyncpg は、PostgreSQL データベースの非同期処理をサポートする Python のサードパーティ製ライブラリです。公式ドキュメントはこちらを参照してください。
非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。I/O バウンドな処理は、ファイル入出力やネットワーク通信等が該当します。
PostgreSQL にアクセスする際には同期的なパッケージとして psycopg2 がよく使用されます。また、PostgreSQL 用の非同期ライブラリとしては他にも aiopg というパッケージもあります。aiopg は psycopg2 をベースにしているため、psycopg2 を利用している人には適しているかもしれません。一方で、asyncpg は高速であることで知られています。
今回は asyncpg の使い方について説明していきたいと思います。
asyncpg を用いた非同期通信の基本
この記事では「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」で紹介した PostgreSQL データベースへのアクセスクラスの非同期版クラスを作成してみたいと思います。
asyncpg のインストール
asyncpg はサードパーティ製ライブラリのため、インストールが必要です。以下のように pip でインストールをしてください。
pip install asyncpg
インストールが完了したら非同期に PostgreSQL を扱うことが可能です。
サンプルプログラムの前提条件
この記事では PostgreSQL のインストール自体については説明を省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。
- PostgreSQL がローカル PC にインストールされているものとします。
- DB「test」、スキーマ「work」が作成されているものとします。
- ユーザー「test」、パスワードは「PAssw0rd」とし、work スキーマに対して適切に権限が設定されているものとします。
(GRANT ALL ON SCHEMA work TO test;)
以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正して動作させることも可能です。
asyncpg を用いた PostgreSQL アクセスクラス
asyncpg を用いて PostgreSQL データベースへアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明していきます。
【dbconfig.ini】DB アクセス設定ファイル
[POSTGRESQL_DB_SERVER] host = localhost port = 5432 dbname = test user = test password = PAssw0rd [POOL] min_size=1 max_size=10
【db_connect_asyncpg.py】非同期 DB アクセス用クラス
import configparser
import asyncpg
class DbConnectAsyncPostgres:
# 接続プール
connection_pool = None
@classmethod
async def initialize_connection_pool(
cls, config_file="./dbconfig.ini"
) -> None:
"""接続プールの初期化
Args:
config_file: 設定ファイルパス
Returns:
None
"""
if cls.connection_pool is None:
# コンフィグファイルからデータを取得
config_db = configparser.ConfigParser()
config_db.read(config_file)
# 接続情報の取得
host = config_db["POSTGRESQL_DB_SERVER"]["host"]
port = config_db["POSTGRESQL_DB_SERVER"]["port"]
dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
user = config_db["POSTGRESQL_DB_SERVER"]["user"]
password = config_db["POSTGRESQL_DB_SERVER"]["password"]
# プール設定の取得
min_size = int(config_db["POOL"]["min_size"])
max_size = int(config_db["POOL"]["max_size"])
# DSN(Data Source Name)の作成
dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
# 非同期接続プールの初期化
cls.connection_pool = await asyncpg.create_pool(
dsn=dsn,
min_size=min_size,
max_size=max_size,
)
@classmethod
async def close_connection_pool(cls):
"""接続プールをクローズする"""
if cls.connection_pool:
await cls.connection_pool.close()
cls.connection_pool = None
def __init__(self) -> None:
"""コンストラクタ"""
self.conn = None
self.transaction = None
async def connect(self) -> None:
"""DB接続"""
if self.connection_pool is None:
raise Exception("接続プールが初期化されていません")
# 接続プールから非同期でコネクションを取得
self.conn = await self.connection_pool.acquire()
async def close(self) -> None:
"""DBクローズ"""
if self.conn:
# 接続プールに返却する
await self.connection_pool.release(self.conn)
self.conn = None
async def __aenter__(self):
# DB接続
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# DBクローズ
await self.close()
async def execute_non_query(
self, sql: str, bind_var: tuple = None
) -> None:
"""CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド
Args:
sql: 実行SQL
bind_var: バインド変数
Returns:
None
"""
# SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
await self.conn.execute(sql, *(bind_var or ()))
async def execute_query(
self, sql: str, bind_var: tuple = None, count: int = 0
) -> list[asyncpg.Record]:
"""SELECTのSQL実行メソッド
Args:
sql: 実行SQL
bind_var: バインド変数
count: データ取得件数 (0は全ての行を取得)
Returns:
結果リスト (asyncpg.Record型)
"""
stmt = await self.conn.prepare(sql)
# データ取得件数が指定されている場合
if count > 0:
rows = await stmt.fetch(count, *(bind_var or ()))
else:
rows = await stmt.fetch(*(bind_var or ()))
return rows
async def start_transaction(self) -> None:
"""トランザクションの開始"""
# コネクションでトランザクションを開始する
self.transaction = self.conn.transaction()
# トランザクションを開始する
await self.transaction.start()
async def commit_transaction(self) -> None:
"""トランザクションのコミット"""
await self.transaction.commit()
async def rollback_transaction(self) -> None:
"""トランザクションのロールバック"""
await self.transaction.rollback()サンプルコードの詳細説明
DB 接続設定ファイル dbconfig.ini
データベースへアクセスする際には、接続のための設定情報が必要です。DB の設定情報は、DB アクセス設定ファイルとして dbconfig.ini として用意します。
[POSTGRESQL_DB_SERVER] host = localhost port = 5432 dbname = test user = test password = PAssw0rd [POOL] min_size=1 max_size=10
設定ファイルには以下の情報を記載しています。
| 設定値 | 概要 |
|---|---|
host | 具体的なサーバーのホスト名や IP アドレスを記載します。今回はローカル PC を使うため localhost としています。 |
port | PostgreSQL へアクセスするためのポートです。PostgreSQL ではデフォルトで 5432 が使用されます。お使いの環境で変更している場合は修正が必要です。 |
dbname | アクセスするデータベース名を記載します。 |
user | アクセスするユーザーを記載します。 |
password | アクセス時のパスワードを記載します。 |
以下の項目については、データベースへの接続プールに関する設定項目です。
| 設定値 | 概要 |
|---|---|
min_size | 接続プールの最小コネクション数を表します。 |
max_size | 接続プールの最大コネクション数を表します。 |
非同期DBアクセス用クラス DbConnectAsyncPostgres
DB アクセス用のクラスとして DbConnectAsyncPostgres クラスを定義しています。このクラスを DB を使用するプログラムから呼び出すことで非同期に PostgreSQL へアクセスできます。また、このクラスは接続プールやコンテキストマネージャーに対応しています。
定義メソッドの概要は以下になります。
| メソッド名 | 概要 |
|---|---|
initialize_connection_pool | 接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。 このメソッドは classmethod として定義し、アプリケーション起動時に呼び出します。 |
close_connection_pool | 接続プールをクローズするメソッドです。 このメソッドは classmethod として定義し、アプリケーション終了時に呼び出します。 |
__init__ | DbConnectAsyncPostgres クラスのコンストラクタです。 |
connect | 接続プールから、非同期にコネクションを取得します。 |
close | データベースのクローズ処理を行います。コネクションは、接続プールへ返却します。 |
__aenter__ | with 句を実行する際に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。connect メソッドを呼び出すことで DB 接続を行い、戻り値を as で指定した変数に反映します。 |
__aexit__ | with 句を抜ける時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。例外が発生した場合には、exc_type、exc_val、exc_tb に例外情報が渡されます。例外発生時はロールバックし、例外がなければコミットしたうえで、DB 接続をクローズします。 |
execute_non_query | CREATE / INSERT / UPDATE / DELETE 処理用の SQL 実行メソッドです。SQL 文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query | SELECT 処理用の SQL 実行メソッドです。返却値としては辞書のリストを返します。SQL 文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。 |
start_transaction | トランザクションを開始するメソッドです。 |
commit_transaction | コミットを行うメソッドです。 |
rollback_transaction | ロールバックを行うメソッドです。 |
それぞれのメソッドのポイントについて以降で説明します。なお、非同期に実行される関数については「async_def」で定義されることに注意してください。また、各関数内で呼び出す際も、非同期に呼び出す場合には「await」キーワードが必要です。
接続プールの初期化:initialize_connection_pool
initialize_connection_pool は、接続プールの初期化を行います。初期化の際には、設定ファイルから DB の接続情報と接続プールの設定値を取得します。
# 接続プール
connection_pool = None
@classmethod
async def initialize_connection_pool(
cls, config_file="./dbconfig.ini"
) -> None:
"""接続プールの初期化
Args:
config_file: 設定ファイルパス
Returns:
None
"""
if cls.connection_pool is None:
# コンフィグファイルからデータを取得
config_db = configparser.ConfigParser()
config_db.read(config_file)
# 接続情報の取得
host = config_db["POSTGRESQL_DB_SERVER"]["host"]
port = config_db["POSTGRESQL_DB_SERVER"]["port"]
dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
user = config_db["POSTGRESQL_DB_SERVER"]["user"]
password = config_db["POSTGRESQL_DB_SERVER"]["password"]
# プール設定の取得
min_size = int(config_db["POOL"]["min_size"])
max_size = int(config_db["POOL"]["max_size"])
# DSN(Data Source Name)の作成
dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
# 非同期接続プールの初期化
cls.connection_pool = await asyncpg.create_pool(
dsn=dsn,
min_size=min_size,
max_size=max_size,
)initialize_connection_pool は、@classmethod でクラスメソッドとして定義し、アプリケーション起動時に 1 度呼び出します。コンストラクタ (__init__) で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。
接続プールを作成するには asyncpg.create_pool を使用します。接続時にはDSN (Data Source Name) を指定しますが postgresql://{user}:{password}@{host}:{port}/{dbname} という形式をとります。今回の場合、具体的には「postgresql://test:PAssw0rd@localhost:5432/test」となります。
min_size は最小接続数、max_size は最大接続数を表し、設定ファイルで min_size=1、max_size=10 としているので最大 10 のコネクションを使用可能です。
接続プールのクローズ:close_connection_pool
接続プールについては、アプリケーションで不要になったときに close を使用してクローズします。
@classmethod
async def close_connection_pool(cls):
"""接続プールをクローズする"""
if cls.connection_pool:
await cls.connection_pool.close()
cls.connection_pool = None接続プールをクローズする際には close メソッドを使用して、すべての接続をクローズします。connection_pool に None を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。
コンストラクタ:__init__
コンストラクタ (__init__) では、DB への接続インスタンスごとにコネクションとトランザクション用の変数を用意しておきます。具体的なコネクション取得は、以降の connect メソッドで行います。
def __init__(self) -> None:
"""コンストラクタ"""
self.conn = None
self.transaction = NoneDB 接続:connect
connect メソッドでは、接続プールからコネクションを取得します。
async def connect(self) -> None:
"""DB接続"""
if self.connection_pool is None:
raise Exception("接続プールが初期化されていません")
# 接続プールから非同期でコネクションを取得
self.conn = await self.connection_pool.acquire()コネクションを取得する際には、接続プールから「self.connection_pool.acquire()」というように acquire メソッドを使用してコネクションを取得します。
DB クローズ:close
close メソッドでは、データベースのクローズ処理を行います。
async def close(self) -> None:
"""DBクローズ"""
if self.conn:
# 接続プールに返却する
await self.connection_pool.release(self.conn)
self.conn = Noneコネクションについては、接続プールに返却するために release メソッドにコネクションを渡します。conn に None を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。
コンテキストマネージャ:__aenter__ 及び __aexit__
DbConnectAsyncPostgres クラスは、with 句を使用できるようにコンテキストマネージャーに対応して実装します。コンテキストマネージャーに対応するためには、具体的には __aenter__ メソッドと __aexit__ メソッドの実装が必要です。通常のコンテキストマネージャのメソッドとは違い「a」がついた非同期用のメソッドになっている違いがあることに注意してください。
async def __aenter__(self):
# DB接続
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# DBクローズ
await self.close()__aenter__ メソッドでは、DB 接続を行います。これにより with 句に入った際に DB 接続を行い、as で指定された変数に DB クラスのインスタンスを設定します。
with 句を抜ける際には __aexit__ メソッドが呼び出されます。この際に close メソッドでデータベースのクローズ処理を行います。
SQL実行:execute_non_query
SQL 実行用のメソッドとして execute_non_query を用意しています。後述する execute_query との違いは返却値がないことで CREATE / INSERT / UPDATE / DELETE などの SQL を実行するために使用します。
async def execute_non_query(
self, sql: str, bind_var: tuple = None
) -> None:
"""CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド
Args:
sql: 実行SQL
bind_var: バインド変数
Returns:
None
"""
# SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
await self.conn.execute(sql, *(bind_var or ()))SQL を実行するためにはコネクションの execute メソッドに対象 SQL を渡します。また、バインド変数がある場合には引数に指定して実行することができます。
バインド変数は、アンパックして渡す必要がありますが or を使用して簡潔に記載しています。この記載方法では bind_var が None の場合は () が渡されます。
SQL実行:execute_query
SQL 実行用のメソッドとしてもう 1 つ execute_query を用意しています。上記の execute_non_query との違いは返却値があることです。SELECT してデータを取得する SQL を実行するために使用します。
async def execute_query(
self, sql: str, bind_var: tuple = None, count: int = 0
) -> list[asyncpg.Record]:
"""SELECTのSQL実行メソッド
Args:
sql: 実行SQL
bind_var: バインド変数
count: データ取得件数 (0は全ての行を取得)
Returns:
結果リスト (asyncpg.Record型)
"""
stmt = await self.conn.prepare(sql)
# データ取得件数が指定されている場合
if count > 0:
rows = await stmt.fetch(count, *(bind_var or ()))
else:
rows = await stmt.fetch(*(bind_var or ()))
return rows
SQL を実行するためには、コネクションの fetch メソッドに対象となる SQL を渡しますが、その前に prepare メソッドで SQL を stmt として準備します。
SELECT 結果を全て取得する場合は stmt.fetch にバインド変数のみ渡して呼び出します。count が指定されている (>0) 場合には、count を引数に指定することでその件数を取得することができます。
なお、返却の rows は asyncpg.Record 型のリストとなります。必要に応じて結果を dict() で Python の辞書に変換することも可能です。
トランザクションの開始:start_transaction
asyncpg では、自動コミットモードを採用しています。そのため、SQL を実行したタイミングで自動でコミットされますが、複数 SQL で 1 つのトランザクションととらえたい場合があります。このような場合は、以下の start_transaction を使用します。
async def start_transaction(self) -> None:
"""トランザクションの開始"""
# コネクションでトランザクションを開始する
self.transaction = self.conn.transaction()
# トランザクションを開始する
await self.transaction.start()コネクションからトランザクションを開始するには、コネクションの transaction メソッドにより接続からトランザクションを用意し start メソッドでトランザクションを開始します。
コミット:commit_transaction
start_transaction によりトランザクションを開始した場合には、適切な位置でコミットを明示的に行う必要があります。この場合は commit_transaction を使用します。
async def commit_transaction(self) -> None:
"""トランザクションのコミット"""
await self.transaction.commit()内部的にはトランザクションの commit メソッドを呼び出しています。コミット時に他の処理が必要な場合は追加してください。
ロールバック:rollback_transaction
コミット前のデータベースの変更をトランザクション開始時点まで戻したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。この場合には rollback_transaction を使用します。
async def rollback_transaction(self) -> None:
"""トランザクションのロールバック"""
await self.transaction.rollback()内部的にはトランザクションの rollback メソッドを呼び出しています。ロールバック時に他の処理が必要な場合は追加してください。
DB アクセスクラスの使い方
上記で定義した DbConnectAsyncPostgres クラスは、他のプログラムから使用することが可能です。以降では、DbConnectPostgres クラスを使用して、データの追加 (INSERT)、データ検索 (SELECT) の使い方を紹介します。
基本的な使い方 (自動コミットモードを使用)
データを INSERT する
データを非同期に登録 (INSERT) したい場合には、以下のようにします。
import asyncio
from db_connect_asyncpg import DbConnectAsyncPostgres
async def insert_data(value1, value2):
"""データをインサートする (自動コミットモードを使用する場合)"""
async with DbConnectAsyncPostgres() as db:
insert_sql = (
"INSERT INTO work.sample_table "
"(str1, value1, last_update_datetime) "
"VALUES(\$1, \$2, current_timestamp)"
)
await db.execute_non_query(insert_sql, (value1, value2))
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
# 非同期タスクを並行実行
tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
await asyncio.gather(*tasks)
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())以降でポイントを説明していきます。
モジュールのインポート
DB アクセスクラスを使うには、作成モジュールを以下のようにインポートします。
from db_connect_asyncpg import DbConnectAsyncPostgres
接続プールの初期化とクローズ
今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
...(省略)...
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()接続プールは、プールからコネクションを取得して使う考え方であるため、アプリケーション起動時に 1 度作成し、終了時にクローズします。
非同期 INSERT 関数
非同期な INSERT 用関数として insert_data 関数を定義しています。
async def insert_data(value1, value2):
"""データをインサートする (自動コミットモードを使用する場合)"""
async with DbConnectAsyncPostgres() as db:
insert_sql = (
"INSERT INTO work.sample_table "
"(str1, value1, last_update_datetime) "
"VALUES(\$1, \$2, current_timestamp)"
)
await db.execute_non_query(insert_sql, (value1, value2))今回作成したクラスは、コンテキストマネージャーを使用できるため、with 句を使ってインスタンスを作成することができます。
with 句に入った際に connect メソッドにより接続プールからコネクションが取得され with 句を抜ける際にコネクションは返却されます。asyncpg は、自動コミットモードとなっているため execute_non_query が実行されたタイミングでコミットされます。
また、SQL インジェクションを防ぐ手段の一つであるプレースホルダーですが、asyncpg では値を埋め込むための SQL のプレースホルダーとして $1、$2 といった表記を使用し、引数に渡すタプルの値が順に設定されます。psycopg2 では %s といった表記であり少し異なっています。
非同期タスクの作成と実行
メインコルーチン (main) では、接続プールを生成した後にタスクを作成しています。
# 非同期タスクを並行実行
tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
await asyncio.gather(*tasks)タスクは、リスト内包表記を使って 10 個のタスクを用意しています。このリストを非同期に実行する際に asyncio.gather を使用します。これにより、各タスクは非同期に実行されます。
トランザクションを明確に管理する使い方
asyncpg は、自動コミットモードを採用しているため、SQL を実行したタイミングで自動的にコミットまたはロールバックが行われます。これは 1 つの SQL を 1 トランザクションととらえていることになります。
しかし、実際には複数 SQL を 1 トランザクションとして考えたい場合もあります。そのような場合は、以下のように明示的にトランザクションの開始やコミットを実行することができます。
import asyncio
from db_connect_asyncpg import DbConnectAsyncPostgres
async def insert_data_with_transaction(value1, value2):
"""データをインサートする (トランザクションを明示的に管理する場合)"""
async with DbConnectAsyncPostgres() as db:
try:
# トランザクションの開始
await db.start_transaction()
# インサートを実行
insert_sql = (
"INSERT INTO work.sample_table "
"(str1, value1, last_update_datetime) "
"VALUES(\$1, \$2, current_timestamp)"
)
await db.execute_non_query(insert_sql, (value1, value2))
# コミット
await db.commit_transaction()
except Exception as ex:
await db.rollback_transaction()
print(f"エラー発生: {ex}")
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
# 非同期タスクを並行実行
tasks = [
insert_data_with_transaction(f"test_str{i}", 10 * i) for i in range(10)
]
await asyncio.gather(*tasks)
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())基本的な使い方は先に説明したプログラムと同様ですが async with のブロックの中で明示的に start_transaction によりトランザクションを開始してます。
トランザクションを明示的に開始した場合には execute_non_query を実行したタイミングではコミットは行われません。そのため、明示的に commit_transaction でコミットを行う必要があります。また、try ... except ... を用いることで例外をキャッチした場合には rollback_transaction によりロールバックする構成となっています。
このように明示的にトランザクションの範囲を制御することで、複数 SQL の実行を 1 トランザクションとして使うことも可能になります。
今回は、INSERT の例を紹介しましたが UPDATE や DELETE についても同様の考え方で使用することが可能です。
データを検索する
今回作成したクラスでデータを検索 (SELECT) する例についても見てみましょう。複数の SELECT を非同期に実行する場合には、以下のように実行します。
import asyncio
from pprint import pprint
from db_connect_asyncpg import DbConnectAsyncPostgres
async def select_data(target_id):
"""データを検索する"""
async with DbConnectAsyncPostgres() as db:
select_sql = "SELECT * FROM work.sample_table WHERE id = $1"
result = await db.execute_query(select_sql, (target_id,))
return result
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
# 非同期タスクを並行実行
tasks = [
select_data(1),
select_data(2),
select_data(3),
]
results = await asyncio.gather(*tasks)
pprint(results)
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())【実行結果】 [[<Record id=1 str1='test_str0' value1=0 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 665664)>], [<Record id=2 str1='test_str1' value1=10 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 840874)>], [<Record id=3 str1='test_str3' value1=30 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 966835)>]]
SELECT のための関数として select_data という非同期関数を用意しています。これは特定の id のデータを検索する関数です。この関数では execute_query に SQL とバインド変数として受け取った id を渡して SELECT を実行し、実行結果を返却します。
非同期タスクの実行についても、これまでと同様でタスクのリストを用意し asyncio.gather に渡します。今回は id=1、id=2、id=3 の場合の SELECT を非同期に実行しています。asyncio.gather は結果をまとめてくれます。
今回の SELECT の例は非同期で実行する必要もないようなものかもしれませんが、非同期での検索の使い方を説明として紹介しています。より複雑で負荷の高い SELECT の場合、非同期で実行し、最後に結果をまとめることで効率的に検索ができます。
まとめ
Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説しました。
asyncpg は、PostgreSQL データベースとの非同期通信をサポートする Python のサードパーティ製ライブラリです。asyncpg は高速であることで知られており、今回は asyncpg を使って非同期に PostgreSQL へアクセスするためのクラスを作成してみました。また、作成したクラスの使い方についても例を使って紹介しています。
非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。DB アクセスについても適切に使用することができれば、性能向上できる可能性が高くなります。ぜひ、asyncpg について使い方を理解していただき、非同期処理をしてみてもらいたいと思います。

