【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)

Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説します。
目次
asyncpg による非同期データベース処理 (PostgreSQL)
非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python でも async や await といった非同期プログラミング方法が導入されています。
async / await を用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」を参考にしてください。
この記事では、非同期処理での効果が期待できる DB アクセスに焦点を当てます。具体的には、asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を紹介します。
asyncpg とは
asyncpg は、PostgreSQL データベースの非同期処理をサポートする Python のサードパーティ製ライブラリです。公式ドキュメントはこちらを参照してください。なお、asyncpg は Python 3.9 以降のバージョンを使う必要があります。
非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。I/O バウンドな処理は、ファイル入出力やネットワーク通信等が該当します。
PostgreSQL にアクセスする Python ライブラリとしては、従来から psycopg2 が広く利用されてきました。psycopg2 は、同期型の PostgreSQL クライアントライブラリであり、多くの Python アプリケーションで利用されています。
近年では、psycopg2 の後継となる psycopg (psycopg3) が登場し、非同期 API (async / await) を利用したデータベースアクセスもサポートされています。
また、PostgreSQL 用の非同期ライブラリとしては、aiopg というパッケージも存在します。aiopg は、psycopg2 をベースに実装されており、psycopg2 を利用しているプロジェクトから移行しやすいという特徴があります。
一方で、asyncpg は、PostgreSQL の非同期通信に特化して設計されたライブラリで、高速に動作することで知られています。そのため、高いパフォーマンスが求められるアプリケーションや非同期処理を中心としたシステムで利用されることが多くあります。
この記事では、asyncpg 利用して PostgreSQL データベースへ非同期にアクセスする方法について解説します。
asyncpg を用いた非同期通信の基本
この記事では PostgreSQL データベースに asyncpg を用いて非同期にアクセスするクラスを作成して使用してみたいと思います。
asyncpg のインストール
asyncpg はサードパーティ製ライブラリのため、インストールが必要です。以下のように pip でインストールできます。
pip install asyncpg
uv を使用している場合は、以下のようにインストールできます。
uv add asyncpg
インストール完了したら Python で非同期に PostgreSQL を扱えるようになります。
サンプルプログラムの前提条件
この記事では PostgreSQL のインストール自体の説明は省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。
- PostgreSQL がローカル PC にインストールされているものとします。
- DB「
test」、スキーマ「work」が作成されているものとします。 - ユーザー「
test」、パスワードは「PAssw0rd」とし、workスキーマに対して適切に権限が設定されているものとします。
(GRANT ALL ON SCHEMA work TO test;)
以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正してください。
asyncpg を用いた非同期対応の PostgreSQL アクセスクラス
asyncpg を用いて PostgreSQL データベースへ非同期にアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明します。
【dbconfig.ini】DB アクセス設定ファイル
[POSTGRESQL_DB_SERVER] host = localhost port = 5432 dbname = test user = test password = PAssw0rd [POOL] min_size=1 max_size=10
【db_connect_asyncpg.py】非同期 DB アクセス用クラス
import configparser
import asyncpg
class DbConnectAsyncPostgres:
# 接続プール
connection_pool = None
@classmethod
async def initialize_connection_pool(cls, config_file="./dbconfig.ini") -> None:
"""接続プールの初期化
Args:
config_file: 設定ファイルパス
Returns:
None
"""
if cls.connection_pool is None:
# コンフィグファイルからデータを取得
config_db = configparser.ConfigParser()
config_db.read(config_file)
# 接続情報の取得
host = config_db["POSTGRESQL_DB_SERVER"]["host"]
port = config_db["POSTGRESQL_DB_SERVER"]["port"]
dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
user = config_db["POSTGRESQL_DB_SERVER"]["user"]
password = config_db["POSTGRESQL_DB_SERVER"]["password"]
# プール設定の取得
min_size = int(config_db["POOL"]["min_size"])
max_size = int(config_db["POOL"]["max_size"])
# DSN(Data Source Name)の作成
dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
# 非同期接続プールの初期化
cls.connection_pool = await asyncpg.create_pool(
dsn=dsn,
min_size=min_size,
max_size=max_size,
)
@classmethod
async def close_connection_pool(cls):
"""接続プールをクローズする"""
if cls.connection_pool:
await cls.connection_pool.close()
cls.connection_pool = None
def __init__(self) -> None:
"""コンストラクタ"""
self.conn = None
self.transaction = None
async def connect(self) -> None:
"""DB接続"""
if self.__class__.connection_pool is None:
raise RuntimeError("接続プールが初期化されていません。")
# 接続プールから非同期でコネクションを取得
self.conn = await self.__class__.connection_pool.acquire()
async def close(self) -> None:
"""DBクローズ"""
if self.conn:
# 接続プールに返却する
await self.__class__.connection_pool.release(self.conn)
self.conn = None
async def __aenter__(self):
# DB接続
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# DBクローズ
await self.close()
async def execute_non_query(self, sql: str, bind_var: tuple = None) -> None:
"""CREATE / INSERT / UPDATE / DELETEの SQL 実行メソッド
Args:
sql: 実行SQL
bind_var: バインド変数
Returns:
None
"""
# SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
await self.conn.execute(sql, *(bind_var or ()))
async def execute_query(
self, sql: str, bind_var: tuple = None
) -> list[asyncpg.Record]:
"""SELECT の SQL 実行メソッド
Args:
sql: 実行 SQL
bind_var: バインド変数
Returns:
結果リスト (asyncpg.Record型)
"""
return await self.conn.fetch(sql, *(bind_var or ()))
async def execute_query_one(
self, sql: str, bind_var: tuple = None
) -> asyncpg.Record | None:
"""SELECT の SQL 実行メソッド(1件取得)
Args:
sql: 実行 SQL
bind_var: バインド変数
Returns:
結果 (asyncpg.Record型)
"""
return await self.conn.fetchrow(sql, *(bind_var or ()))
async def start_transaction(self) -> None:
"""トランザクションの開始"""
# コネクションでトランザクションを開始する
self.transaction = self.conn.transaction()
# トランザクションを開始する
await self.transaction.start()
async def commit_transaction(self) -> None:
"""トランザクションのコミット"""
await self.transaction.commit()
async def rollback_transaction(self) -> None:
"""トランザクションのロールバック"""
await self.transaction.rollback()サンプルコードの詳細説明
DB 接続設定ファイル dbconfig.ini
データベースへアクセスするには、接続設定が必要です。DB 設定情報は、設定ファイルの dbconfig.ini として用意します。設定ファイルには以下の情報を含みます。
| 設定値 | 概要 |
|---|---|
host | 具体的なサーバーのホスト名や IP アドレスを記載します。今回はローカル PC を使うため localhost としています。 |
port | PostgreSQL へアクセスするためのポートで、デフォルトでは 5432 が使用されます。お使いの環境で変更している場合は修正が必要です。 |
dbname | アクセスするデータベース名を記載します。 |
user | アクセスするユーザーを記載します。 |
password | アクセス時のパスワードを記載します。 |
以下の項目は、データベースへの接続プールに関する設定項目です。
| 設定値 | 概要 |
|---|---|
min_size | 接続プールの最小コネクション数を表します。 |
max_size | 接続プールの最大コネクション数を表します。 |
非同期DBアクセス用クラス DbConnectAsyncPostgres
DB アクセス用のクラスとして DbConnectAsyncPostgres クラスを定義しています。このクラスを DB を使用するプログラムから呼び出すことで非同期に PostgreSQL へアクセスできます。また、接続プールやコンテキストマネージャーに対応しています。
| メソッド名 | 概要 |
|---|---|
initialize_connection_pool | 接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。 このメソッドは classmethod として定義し、アプリケーション起動時に呼び出します。 |
close_connection_pool | 接続プールをクローズするメソッドです。 このメソッドは classmethod として定義し、アプリケーション終了時に呼び出します。 |
__init__ | DbConnectAsyncPostgres クラスのコンストラクタです。 |
connect | 接続プールから、非同期にコネクションを取得します。 |
close | データベースのクローズ処理を行います。コネクションは、接続プールへ返却します。 |
__aenter__ | async with を実行時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。connect メソッドを呼び出すことで DB 接続を行い、戻り値を as で指定した変数に反映します。 |
__aexit__ | async with を抜ける時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。close を呼び出して、DB 接続をクローズします。 |
execute_non_query | CREATE / INSERT / UPDATE / DELETE 処理用の SQL 実行メソッドです。SQL 文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query | SELECT 処理用の SQL 実行メソッドです。返却値としては asyncpg.Record のリストを返します。SQL 文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query_one | SELECT したデータから先頭行 1 行のみ取得したい場合のメソッドです。返却値は asyncpg.Record で、取得結果がない場合は None を返します。 |
start_transaction | トランザクションを開始するメソッドです。 |
commit_transaction | コミットを行うメソッドです。 |
rollback_transaction | ロールバックを行うメソッドです。 |
それぞれのメソッドのポイントについて以降で説明します。なお、非同期に実行される関数については「async def」で定義されることに注意してください。また、非同期に呼び出す場合には「await」キーワードが必要です。
接続プールの初期化:initialize_connection_pool
initialize_connection_pool は、接続プールの初期化を行います。初期化の際には、設定ファイルから DB の接続情報と接続プールの設定値を取得します。
initialize_connection_pool は、@classmethod でクラスメソッドとして定義し、アプリケーション起動時に 1 度呼び出します。コンストラクタ (__init__) で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。
接続プールを作成するには asyncpg.create_pool を使用します。接続時には DSN (Data Source Name) を指定しますが「postgresql://{user}:{password}@{host}:{port}/{dbname}」という形式をとります。今回の場合、具体的には「postgresql://test:PAssw0rd@localhost:5432/test」となります。
min_size は最小接続数、max_size は最大接続数を表し、設定ファイルで min_size=1、max_size=10 としているので最大 10 のコネクションを使用可能です。
接続プールのクローズ:close_connection_pool
接続プールは、アプリケーションで不要になった際にはクローズします。接続プールをクローズする際には close メソッドを使用し、すべての接続をクローズします。
connection_pool に None を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。
コンストラクタ:__init__
コンストラクタ (__init__) では、DB への接続インスタンスごとにコネクションとトランザクション用の変数を用意しておきます。具体的なコネクション取得は、以降の connect メソッドで行います。
DB 接続:connect
connect メソッドでは、接続プールからコネクションを取得します。コネクションを取得する際には、acquire メソッドを使用します。
DB クローズ:close
close メソッドでは、データベースのクローズ処理として、接続プールにコネクションを返却します。コネクションを返却するために release メソッドを使用します。
conn に None を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。
コンテキストマネージャ:__aenter__ 及び __aexit__
DbConnectAsyncPostgres クラスは、async with を使用できるようにコンテキストマネージャーに対応して実装します。コンテキストマネージャーに対応するためには、具体的には __aenter__ メソッドと __aexit__ メソッドの実装が必要です。通常のコンテキストマネージャのメソッドとは違い「a」がついた非同期用のメソッドになっている違いがあることに注意してください。
__aenter__ メソッドでは、async with に入った際に DB 接続を行い、as で指定された変数に DB クラスのインスタンスを設定します。async with を抜ける際には __aexit__ メソッドが呼び出されます。この際に close メソッドでデータベースのクローズ処理を行います。
SQL実行:execute_non_query
SQL 実行用のメソッドとして execute_non_query を用意しています。後述する execute_query との違いは返却値がないことで CREATE / INSERT / UPDATE / DELETE などの SQL を実行するために使用します。
SQL を実行するためにはコネクションの execute メソッドに対象 SQL を渡します。また、バインド変数がある場合には引数に指定して実行することができます。
バインド変数は、アンパックして渡す必要がありますが or を使用して簡潔に記載しています。この記載方法では bind_var が None の場合は () が渡されます。
SQL実行:execute_query
SQL 実行用のメソッドとしてもう 1 つ execute_query を用意しています。上記の execute_non_query との違いは返却値があることです。SELECT してデータを取得する SQL を実行するために使用します。
SQL を実行するためには、コネクションの fetch メソッドに対象となる SQL を渡します。返却の rows は asyncpg.Record 型のリストとなります。必要に応じて結果を dict() で Python の辞書に変換することも可能です。
トランザクションの開始:start_transaction
asyncpg では、自動コミットモードを採用しています。そのため、SQL を実行したタイミングで自動でコミットされますが、複数 SQL で 1 つのトランザクションととらえたい場合があります。このような場合は、start_transaction を使用します。
コネクションからトランザクションを開始するには、コネクションの transaction メソッドにより接続からトランザクションを用意し start メソッドでトランザクションを開始します。
コミット:commit_transaction
start_transaction によりトランザクションを開始した場合には、適切な位置でコミットを明示的に行う必要があります。この場合は commit_transaction を使用します。
内部的にはトランザクションの commit メソッドを呼び出しています。コミット時に他の処理が必要な場合は追加してください。
ロールバック:rollback_transaction
コミット前のデータベースの変更をトランザクション開始時点まで戻したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。この場合には rollback_transaction を使用します。
内部的にはトランザクションの rollback メソッドを呼び出しています。ロールバック時に他の処理が必要な場合は追加してください。
DB アクセスクラスの使い方
上記で定義した DbConnectAsyncPostgres クラスは、他のプログラムから共通的に使用することが可能です。以降では、DbConnectAsyncPostgres クラスを使用して、データの追加 (INSERT)、データ検索 (SELECT) をする方法を紹介します。
基本的な使い方 (自動コミットモードを使用)
データを INSERT する
データを非同期に登録 (INSERT) したい場合には、以下のようにします。
import asyncio
from db_connect_asyncpg import DbConnectAsyncPostgres
async def insert_data_with_auto_commit(value1, value2):
"""データをインサートする (自動コミットモードを使用する場合)"""
async with DbConnectAsyncPostgres() as db:
insert_sql = """
INSERT INTO work.sample_table
(str1, value1, last_update_datetime)
VALUES(\$1, \$2, current_timestamp)
"""
await db.execute_non_query(insert_sql, (value1, value2))
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
try:
# 非同期タスクを並行実行
tasks = [
insert_data_with_auto_commit(f"test_str{i}", 10 * i) for i in range(10)
]
await asyncio.gather(*tasks)
finally:
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())以降でポイントを説明していきます。
モジュールのインポート
DB アクセスクラスを使うには、「from db_connect_asyncpg import DbConnectAsyncPostgres」のようにインポートします。
接続プールの初期化とクローズ
今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。接続プールは、プールからコネクションを取得して使う考え方であるため、アプリケーション起動時に 1 度作成し、終了時にクローズします。
非同期 INSERT 関数
非同期な INSERT 用関数として insert_data_with_auto_commit を定義しています。
今回作成したクラスは、コンテキストマネージャーを使用できるため、async with を使ってインスタンスを作成することができます。
async with に入った際に connect メソッドにより接続プールからコネクションが取得され async with を抜ける際にコネクションは返却されます。asyncpg は、自動コミットモードのため execute_non_query が実行されたタイミングでコミットされます。
また、SQL インジェクションを防ぐ手段の 1 つであるプレースホルダーですが、asyncpg では値を埋め込むための SQL のプレースホルダーとして $1、$2 といった表記を使用し、引数に渡すタプルの値が順に設定されます。psycopg2 では %s といった表記であり少し異なっています。
非同期タスクの作成と実行
main では、接続プールを生成後に「tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]」でタスクのリストを作成しています。
タスクは、リスト内包表記を使って 10 個のタスクを用意しています。このリストを非同期に実行する際に asyncio.gather を使用します。これにより、各タスクは非同期に実行されます。
トランザクションを明確に管理する使い方
asyncpg は、自動コミットモードを採用しているため、SQL を実行したタイミングで自動的にコミットまたはロールバックが行われます。これは 1 つの SQL を 1 トランザクションととらえていることになります。
しかし、実際には複数 SQL を 1 トランザクションとして考えたい場合もあります。そのような場合は、以下のように明示的にトランザクションの開始やコミットを実行することができます。
import asyncio
from db_connect_asyncpg import DbConnectAsyncPostgres
async def insert_data_with_transaction(value1, value2):
"""データをインサートする (トランザクションを明示的に管理する場合)"""
async with DbConnectAsyncPostgres() as db:
try:
# トランザクションの開始
await db.start_transaction()
# インサートを実行
insert_sql = """
INSERT INTO work.sample_table
(str1, value1, last_update_datetime)
VALUES(\$1, \$2, current_timestamp)
"""
await db.execute_non_query(insert_sql, (value1, value2))
# コミット
await db.commit_transaction()
except Exception as ex:
if db.transaction is not None:
await db.rollback_transaction()
print(f"エラー発生: {ex}")
raise
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
try:
# 非同期タスクを並行実行
tasks = [
insert_data_with_transaction(f"test_str{i}", 10 * i) for i in range(10)
]
await asyncio.gather(*tasks)
finally:
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())基本的な使い方は先に説明したプログラムと同様ですが async with のブロックの中で明示的に start_transaction によりトランザクションを開始してます。
トランザクションを明示的に開始した場合には execute_non_query を実行したタイミングではコミットは行われません。そのため、明示的に commit_transaction でコミットを行う必要があります。また、try ... except ... を用いることで例外をキャッチした場合には rollback_transaction によりロールバックする構成となっています。
このように明示的にトランザクションの範囲を制御することで、複数 SQL の実行を 1 トランザクションとして使うことも可能になります。
今回は、INSERT の例を紹介しましたが UPDATE や DELETE についても同様の考え方で使用することが可能です。
データを検索する
作成したクラスでデータを検索 (SELECT) する例についても見てみましょう。複数の SELECT を非同期に実行する場合には、以下のように実行します。
import asyncio
from pprint import pprint
from db_connect_asyncpg import DbConnectAsyncPostgres
async def select_data(min_value):
"""データを検索する"""
async with DbConnectAsyncPostgres() as db:
select_sql = """
SELECT id, str1, value1, last_update_datetime
FROM work.sample_table
WHERE value1 >= $1
ORDER BY id
"""
result = await db.execute_query(select_sql, (min_value,))
return result
async def main():
# 接続プールを初期化する
await DbConnectAsyncPostgres.initialize_connection_pool()
try:
# 非同期タスクを並行実行
tasks = [
select_data(0),
select_data(20),
select_data(50),
]
results = await asyncio.gather(*tasks)
pprint(results)
finally:
# 接続プールをクローズする
await DbConnectAsyncPostgres.close_connection_pool()
if __name__ == "__main__":
# asyncio.runを使用してメインコルーチンを実行
asyncio.run(main())【実行結果】(表示が長いため途中省略) [[<Record id=1 str1='test_str0' value1=0 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 20, 892071)>, ...(途中省略)... <Record id=10 str1='test_str9' value1=90 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 21, 179438)>], [<Record id=3 str1='test_str2' value1=20 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 21, 102211)>, ...(途中省略)... <Record id=10 str1='test_str9' value1=90 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 21, 179438)>], [<Record id=6 str1='test_str5' value1=50 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 21, 177392)>, ...(途中省略)... <Record id=10 str1='test_str9' value1=90 last_update_datetime=datetime.datetime(2026, 3, 14, 17, 58, 21, 179438)>]]
SELECT のための関数として select_data という非同期関数を用意しています。これは最小値のなる min_value を渡してそれ以上の値のレコードを検索する関数です。この関数では execute_query に SQL とバインド変数として受け取った min_value を渡して SELECT を実行し、実行結果を返却します。
非同期タスクの実行は、これまでと同様でタスクのリストを用意し asyncio.gather に渡します。今回は min_value=0、min_value=20、min_value=50 の場合の SELECT を非同期実行しています。asyncio.gather が結果をまとめてくれます。
今回の SELECT の例は非同期で実行するほどではありませんが、非同期での検索の使い方例として紹介しています。より複雑で負荷の高い SELECT の場合、非同期で実行し、最後に結果をまとめることで効率的に検索ができる可能性があります。
まとめ
Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説しました。
asyncpg は、PostgreSQL データベースとの非同期通信をサポートする Python のサードパーティ製ライブラリです。asyncpg は高速であることで知られており、今回は asyncpg を使って非同期に PostgreSQL へアクセスするためのクラスを作成してみました。また、作成したクラスの使い方についても例を使って紹介しています。
非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。DB アクセスについても適切に使用することができれば、性能向上できる可能性が高くなります。ぜひ、asyncpg について使い方を理解していただき、非同期処理を試してみてもらいたいと思います。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

