asyncpg

【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)

【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)

Pythonで、asyncpgを用いて非同期にPostgreSQLデータベースへアクセスする方法を解説します。

asyncpgによる非同期データベース処理 (PostgreSQL)

非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5でもasyncawaitといった非同期プログラミング方法が導入されています。

async/awaitを用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」でまとめているので参考にしてください。

本記事では、非同期処理での効果が期待できるデータベースアクセスに焦点を当てます。具体的には、asyncpgを用いて非同期にPostgreSQLデータベースへアクセスする方法を紹介します。

asyncpgとは

asyncpgとは、PostgreSQLデータベースの非同期処理をサポートするPythonのサードパーティ製ライブラリです。公式ドキュメントはこちらを参照してください。

非同期プログラミングは、I/Oバウンドな処理を実装するためには非常に有益な実装方法です。I/Oバウンドな処理とは、ファイルへの入出力やネットワーク通信等が該当します。

PostgreSQLにアクセスする際には同期的なパッケージとしてpsycopg2がよく使用されます。psycopg2の使い方については「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」でまとめているので参考にしてください。

PostgreSQL用の非同期ライブラリとしては他にもaiopgというパッケージもあります。aiopgpsycopg2をベースにしているため、psycopg2を利用している人には適しているかもしれません。一方で、asyncpgは高速であることで知られています。今回はasyncpgの使い方について説明していきたいと思います。

asyncpgを用いた非同期通信の基本

この記事では「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」で紹介したPostgreSQLデータベースへのアクセスクラスの非同期版クラスを作成してみたいと思います。

asyncpgのインストール

asyncpgはサードパーティ製ライブラリのため、インストールが必要です。以下のようにpipでインストールをしてください。

pip install asyncpg

インストールが完了したら非同期にPostgreSQLを扱うことが可能です。

サンプルプログラムの前提条件

本記事では、PostgreSQLのインストール自体については説明を省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。

  1. PostgreSQLがローカルPCにインストールされているものとする。
  2. DB「test」、スキーマ「work」が作成されているものとする。
  3. ユーザー「test」が作成されており、パスワードは「PAssw0rd」とする。また、workスキーマに対して適切に権限が設定されているものとする。(GRANT ALL ON SCHEMA work TO test;

以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正して動作させることも可能です。

今回は、asyncpgの使用方法に焦点を当てるため細かな例外処理を省略していますのでご注意ください。実際のアプリケーションの利用では、例外処理を十分に考慮する必要があります。

asyncpgを用いたPostgreSQLアクセスクラス

asyncpgを用いてPostgreSQLデータベースへアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明していきます。

【dbconfig.ini】DBアクセス設定ファイル

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
min_size=1
max_size=10

【db_connect_asyncpg.py】非同期DBアクセス用クラス

import configparser

import asyncpg


class DbConnectAsyncPostgres:
    # 接続プール
    connection_pool = None

    @classmethod
    async def initialize_connection_pool(
        cls, config_file="./dbconfig.ini"
    ) -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            min_size = int(config_db["POOL"]["min_size"])
            max_size = int(config_db["POOL"]["max_size"])

            # DSN(Data Source Name)の作成
            dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
            # 非同期接続プールの初期化
            cls.connection_pool = await asyncpg.create_pool(
                dsn=dsn,
                min_size=min_size,
                max_size=max_size,
            )

    @classmethod
    async def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            await cls.connection_pool.close()
            cls.connection_pool = None

    def __init__(self) -> None:
        """コンストラクタ"""
        self.conn = None
        self.transaction = None

    async def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールから非同期でコネクションを取得
        self.conn = await self.connection_pool.acquire()

    async def close(self) -> None:
        """DBクローズ"""
        if self.conn:
            # 接続プールに返却する
            await self.connection_pool.release(self.conn)
            self.conn = None

    async def __aenter__(self):
        # DB接続
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # DBクローズ
        await self.close()

    async def execute_non_query(
        self, sql: str, bind_var: tuple = None
    ) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
        await self.conn.execute(sql, *(bind_var or ()))

    async def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[asyncpg.Record]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数 (0は全ての行を取得)

        Returns:
            結果リスト (asyncpg.Record型)
        """
        stmt = await self.conn.prepare(sql)

        # データ取得件数が指定されている場合
        if count > 0:
            rows = await stmt.fetch(count, *(bind_var or ()))
        else:
            rows = await stmt.fetch(*(bind_var or ()))

        return rows

    async def start_transaction(self) -> None:
        """トランザクションの開始"""
        # コネクションでトランザクションを開始する
        self.transaction = self.conn.transaction()
        # トランザクションを開始する
        await self.transaction.start()

    async def commit_transaction(self) -> None:
        """トランザクションのコミット"""
        await self.transaction.commit()

    async def rollback_transaction(self) -> None:
        """トランザクションのロールバック"""
        await self.transaction.rollback()

サンプルコードの詳細説明

DB接続設定ファイル dbconfig.ini

データベース(DB)へアクセスする際には、接続のための設定情報が必要です。DBの設定情報は、DBアクセス設定ファイルとしてdbconfig.iniとして用意します。

このファイルは、以降で紹介するプログラム内でconfigparserモジュールを用いて内容を取得します。configparserの使用方法については「configparserによる設定ファイル管理」にまとめていますので参考にしてください。

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
min_size=1
max_size=10

設定ファイルには以下の情報を記載しています。

設定値概要
host具体的なサーバーのホスト名やIPアドレスを記載します。今回はローカルPCを使うためlocalhostとしています。
portPostgreSQLへアクセスするためのポートです。PostgreSQLではデフォルトで5432が使用されます。お使いの環境で変更している場合は修正が必要です。
dbnameアクセスするデータベース名を記載します。
userアクセスするユーザーを記載します。
passwordアクセス時のパスワードを記載します。

以下の項目については、データベースへの接続プールに関する設定項目です。

設定値概要
min_size接続プールにおける最小コネクション数を表します。
max_size接続プールにおける最大コネクション数を表します。

今回は簡単のためパスワードは平文のまま記載していますが、セキュリティ上適切ではありません。実際には暗号化等を考慮する必要がある点に注意してください。

非同期DBアクセス用クラス DbConnectAsyncPostgres

DBアクセス用のクラスとしてDbConnectAsyncPostgresというクラスを定義しています。このクラスをDBを使用するプログラムから呼び出すことで非同期にPostgreSQLへアクセスできます。また、このクラスは接続プールやコンテキストマネージャーに対応しています。

定義メソッドの概要は以下になります。

メソッド名概要
initialize_connection_pool接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。
このメソッドはclassmethodとして定義し、アプリケーション起動時に呼び出します。
close_connection_pool接続プールをクローズするメソッドです。
このメソッドはclassmethodとして定義し、アプリケーション終了時に呼び出します。
__init__DbConnectAsyncPostgresクラスのコンストラクタです。
connect接続プールから非同期にコネクションを取得します。
closeデータベースのクローズ処理を行います。コネクションは、接続プールへ返却します。
__aenter__with句を実行する際に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。
connectメソッドを呼び出すことでDB接続を行い、戻り値をasで指定された変数に反映します。
__aexit__with句を抜ける時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。例外が発生した場合には、exc_typeexc_valexc_tbに例外情報が渡されます。
例外発生時はロールバックし、例外がなければコミットを行ったうえで、DB接続をクローズします。
execute_non_queryCREATE/INSERT/UPDATE/DELETE処理用のSQL実行メソッドです。SQL文と必要に応じてバインド変数を渡して処理を実行します。
execute_querySELECT処理用のSQL実行メソッドです。返却値としては辞書のリストを返します。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。
start_transactionトランザクションを開始するメソッドです。
commit_transactionコミットを行うメソッドです。
rollback_transactionロールバックを行うメソッドです。

それぞれのメソッドのポイントについて以降で説明していきます。なお、非同期に実行される関数については「async def」で定義されていることに注意してください。また、各関数内で呼び出す際にも非同期に呼び出す場合には「await」キーワードを指定することが重要です。

接続プールの初期化:initialize_connection_pool

initialize_connection_poolは、接続プールの初期化を行います。初期化の際には、設定ファイルからDBの接続情報と接続プールの設定値を取得します。

    # 接続プール
    connection_pool = None

    @classmethod
    async def initialize_connection_pool(
        cls, config_file="./dbconfig.ini"
    ) -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            min_size = int(config_db["POOL"]["min_size"])
            max_size = int(config_db["POOL"]["max_size"])

            # DSN(Data Source Name)の作成
            dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
            # 非同期接続プールの初期化
            cls.connection_pool = await asyncpg.create_pool(
                dsn=dsn,
                min_size=min_size,
                max_size=max_size,
            )

initialize_connection_poolは、@classmethodでクラスメソッドとして定義し、アプリケーション起動時に1度呼び出します。コンストラクタ(__init__)で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。

接続プールを作成するには、asyncpg.create_poolを使用します。接続時にはDSN(Data Source Name)を指定しますが、postgresql://{user}:{password}@{host}:{port}/{dbname}という形式をとります。今回の場合、具体的には「postgresql://test:PAssw0rd@localhost:5432/test」ということになります。

また、min_sizeは最小接続数、max_sizeは最大接続数を表しており、今回の設定ファイルでは、min_size=1max_size=10としているので、最大10のコネクションを使用することが可能です。

Note

最大接続数を超えた場合の挙動

asyncpgの接続プールでは、最大コネクションに達した場合に他のコネクションを使用しようとした場合には、コネクションが解放されるまで待ちます。psycopg2の接続プールでは、最大を超えるコネクションを接続をしようとするとエラーとなるので挙動に違いがあります。

asyncpgは、可能な限り効率的にリソースを利用しようとしますが、psycopg2ではそのような状況を許容しません。これは非同期処理を中心に設計されたasyncpgと、より伝統的な同期処理を主とするpsycopg2の設計思想の違いがあるためです。

接続プールのクローズ:close_connection_pool

接続プールについては、アプリケーションで不要になったときにcloseを使用してクローズします。

    @classmethod
    async def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            await cls.connection_pool.close()
            cls.connection_pool = None

接続プールをクローズする際にはcloseメソッドを使用して、すべての接続をクローズします。connection_poolNoneを設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNoneを設定しています。

コンストラクタ:__init__

コンストラクタ(__init__)では、DBへの接続インスタンスごとにコネクションとトランザクション用の変数を用意しておきます。具体的なコネクション取得は、以降のconnectメソッドで行います。

    def __init__(self) -> None:
        """コンストラクタ"""
        self.conn = None
        self.transaction = None

DB接続:connect

connectメソッドでは、接続プールからコネクションを取得します。

    async def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールから非同期でコネクションを取得
        self.conn = await self.connection_pool.acquire()

コネクションを取得する際には、接続プールから「self.connection_pool.acquire()」というようにacquireメソッドを使用してコネクションを取得します。

DBクローズ:close

closeメソッドでは、データベースのクローズ処理を行います。

    async def close(self) -> None:
        """DBクローズ"""
        if self.conn:
            # 接続プールに返却する
            await self.connection_pool.release(self.conn)
            self.conn = None

コネクションについては、接続プールに返却するためにreleaseメソッドにコネクションを渡します。connNoneを設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNoneを設定しています。

コンテキストマネージャ:__aenter__ 及び __aexit__

DbConnectAsyncPostgresクラスは、with句を使用できるようにコンテキストマネージャーに対応して実装します。コンテキストマネージャーに対応するためには、具体的には__aenter__メソッドと__aexit__メソッドの実装が必要です。

コンテキストマネージャーの考え方については「コンテキストマネージャーの基本」でまとめているので参考にしてください。ただし、コンテキストマネージャに対応する関数は、__aenter____aexit__というように「a」がついた非同期用のメソッドになっている違いがあることに注意してください。

    async def __aenter__(self):
        # DB接続
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):        
        # DBクローズ
        await self.close()

__aenter__メソッドでは、DB接続を行います。これによりwith句に入った際にDB接続を行い、asで指定された変数にDBクラスのインスタンスを設定します。

with句を抜ける際には、__aexit__メソッドが呼び出されます。この際にcloseメソッドでデータベースのクローズ処理を行います。

Note

asyncpgでは、自動コミットモードを採用しています。そのため、SQLを実行したタイミングで自動でコミットされます。これは1つのSQLをトランザクションと考えていることに相当します。

もし、複数のSQLで1つのトランザクションととらえたい場合には、以降で説明するstart_transactioncommit_transactionrollback_transactionを明示的に使用する必要があります。

なお、同期的なパッケージであるpsycopg2では、自動コミットモードはデフォルトでOFFになっているため、明示的にコミットが必要です。この点で各パッケージには考え方が違うことを覚えておいてください。

SQL実行:execute_non_query

SQL実行用のメソッドとしてexecute_non_queryを用意しています。後述するexecute_queryとの違いは返却値がないことで、CREATE/INSERT/UPDATE/DELETEなどのSQLを実行するために使用します。

    async def execute_non_query(
        self, sql: str, bind_var: tuple = None
    ) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
        await self.conn.execute(sql, *(bind_var or ()))

SQLを実行するためにはコネクションのexecuteメソッドに対象のSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。

バインド変数は、アンパックして渡す必要がありますがorを使用して簡潔に記載しています。この記載方法では、bind_varNoneの場合は()が渡されます。

SQL実行:execute_query

SQL実行用のメソッドとしてもう一つexecute_queryを用意しています。上記のexecute_non_queryとの違いは、返却値があることです。SELECTしてデータを取得するSQLを実行するために使用します。

    async def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[asyncpg.Record]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数 (0は全ての行を取得)

        Returns:
            結果リスト (asyncpg.Record型)
        """
        stmt = await self.conn.prepare(sql)

        # データ取得件数が指定されている場合
        if count > 0:
            rows = await stmt.fetch(count, *(bind_var or ()))
        else:
            rows = await stmt.fetch(*(bind_var or ()))

        return rows

SQLを実行するためには、コネクションのfetchメソッドに対象となるSQLを渡しますが、その前にprepareメソッドでSQLをstmtとして準備します。

SELECT結果を全て取得する場合は、stmt.fetchにバインド変数のみ渡して呼び出します。countが指定されていいる(>0)の場合には、countを引数に指定することでその件数を取得することができます。

なお、返却のrowsは、asyncpg.Record型のリストとなります。必要に応じて結果をdict()でPythonの辞書に変換することも可能です。

トランザクションの開始:start_transaction

asyncpgでは、自動コミットモードを採用しています。そのため、SQLを実行したタイミングで自動でコミットされますが、複数のSQLで1つのトランザクションととらえたい場合があります。このような場合には、以下のstart_transactionを使用します。

    async def start_transaction(self) -> None:
        """トランザクションの開始"""
        # コネクションでトランザクションを開始する
        self.transaction = self.conn.transaction()
        # トランザクションを開始する
        await self.transaction.start()

コネクションからトランザクションを開始するには、コネクションのtransactionメソッドにより接続からトランザクションを用意し、startメソッドでトランザクションを開始します。

コミット:commit_transaction

start_transactionによりトランザクションを開始した場合には、適切な位置でコミットを明示的に行う必要があります。この場合には、commit_transactionを使用します。

    async def commit_transaction(self) -> None:
        """トランザクションのコミット"""
        await self.transaction.commit()

内部的には、トランザクションのcommitメソッドを呼び出しているだけです。もし、コミット時に他の処理が必要な場合は追加してください。

ロールバック:rollback_transaction

コミット前のデータベースの変更をトランザクション開始時点まで戻したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。この場合には、rollback_transactionを使用します。

    async def rollback_transaction(self) -> None:
        """トランザクションのロールバック"""
        await self.transaction.rollback()

内部的には、トランザクションのrollbackメソッドを呼び出しているだけです。もし、ロールバック時に他の処理が必要な場合は追加してください。

DBアクセスクラスの使い方

上記で定義したDbConnectAsyncPostgresクラスは、他のプログラムから使用することが可能です。以降ではは、DbConnectPostgresクラスを使用して、データの追加(INSERT)、データ検索(SELECT)の使い方例を紹介します。

基本的な使い方 (自動コミットモードを使用)

データをINSERTする

データを非同期に登録(INSERT)したい場合には、以下のようにします。

import asyncio

from db_connect_asyncpg import DbConnectAsyncPostgres


async def insert_data(value1, value2):
    """データをインサートする (自動コミットモードを使用する場合)"""
    async with DbConnectAsyncPostgres() as db:
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES($1, $2, current_timestamp)"
        )
        await db.execute_non_query(insert_sql, (value1, value2))


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
    await asyncio.gather(*tasks)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())

以降でポイントを説明していきます。

モジュールのインポート

DBアクセスクラスを使うには、作成したモジュールを以下のようにインポートしてください。

from db_connect_asyncpg import DbConnectAsyncPostgres

接続プールの初期化とクローズ

今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。

    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    ...(省略)...

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()

接続プールは、プールからコネクションを取得して使うという考え方であるため、アプリケーション起動時に1度作成し、アプリケーション終了時にクローズするようにしてください。

非同期INSERT関数

非同期なINSERT用関数としてinsert_data関数を定義しています。

async def insert_data(value1, value2):
    """データをインサートする (自動コミットモードを使用する場合)"""
    async with DbConnectAsyncPostgres() as db:
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES($1, $2, current_timestamp)"
        )
        await db.execute_non_query(insert_sql, (value1, value2))

※[注意] 上記コードについて、使用しているプラグインの影響でうまく表示されず見た目上「VALUES(1,2, current_timestamp)」の表示になってしまっているかと思いますが、正確には「VALUES($1, $2, current_timestamp)」です。

今回作成したクラスは、コンテキストマネージャーを使用できるため、with句を使ってインスタンスを作成することができます。

with句に入った際に、connectメソッドにより接続プールからコネクションが取得され、with句を抜ける際にコネクションは返却されます。asyncpgは、自動コミットモードとなっているため、execute_non_queryが実行されたタイミングで、すぐにコミットされます。

また、SQLインジェクションを防ぐ手段の一つであるプレースホルダーですが、asyncpgでは値を埋め込むためのSQLのプレースホルダーとして$1$2といった表記を使用し、引数に渡すタプルの値が順に設定されます。psycopg2では、%sといった表記であり少し異なっていることに注意が必要です。


非同期タスクの作成と実行

メインコルーチン(main)の中では、接続プールを生成した後にタスクを作成しています。

    # 非同期タスクを並行実行
    tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
    await asyncio.gather(*tasks)

タスクは、リスト内包表記を使って10個のタスクを用意しています。このリストを非同期に実行する際にasyncio.gatherを使用します。これにより、各タスクは非同期に実行されます。

トランザクションを明確に管理する使い方

asyncpgは、自動コミットモードを採用しているため、SQLを実行したタイミングで自動的にコミットまたはロールバックが行われます。これは1つのSQLを1トランザクションととらえていることと同じです。

しかし、実際には複数のSQLを1トランザクションとして考えたい場合もあります。そのようなときには、以下のように明示的にトランザクションの開始やコミットを実行することができます。

import asyncio

from db_connect_asyncpg import DbConnectAsyncPostgres


async def insert_data_with_transaction(value1, value2):
    """データをインサートする (トランザクションを明示的に管理する場合)"""

    async with DbConnectAsyncPostgres() as db:
        try:
            # トランザクションの開始
            await db.start_transaction()

            # インサートを実行
            insert_sql = (
                "INSERT INTO work.sample_table "
                "(str1, value1, last_update_datetime) "
                "VALUES($1, $2, current_timestamp)"
            )
            await db.execute_non_query(insert_sql, (value1, value2))

            # コミット
            await db.commit_transaction()
        except Exception as ex:
            await db.rollback_transaction()
            print(f"エラー発生: {ex}")


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [
        insert_data_with_transaction(f"test_str{i}", 10 * i) for i in range(10)
    ]
    await asyncio.gather(*tasks)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())

※[注意] 上記コードについて、使用しているプラグインの影響でうまく表示されず見た目上「VALUES(1,2, current_timestamp)」の表示になってしまっているかと思いますが、正確には「VALUES($1, $2, current_timestamp)」です。

基本的な使い方は先に説明したプログラムと同様ですが、async withのブロックの中で明示的にstart_transactionによりトランザクションを開始してます。

トランザクションを明示的に開始した場合には、execute_non_queryを実行したタイミングではコミットは行われません。そのため、明示的にcommit_transactionでコミットを行う必要があります。また、try...exceptを用いることで例外をキャッチした場合には、rollback_transactionによりロールバックする構成となっています。

このように明示的にトランザクションの範囲を制御することで、複数のSQLの実行を1トランザクションとして使うことも可能になります。

今回は、INSERTの例を紹介しましたが、UPDATEDELETEについても同様の考え方で使用することが可能です。

データを検索する

今回作成したクラスでデータを検索(SELECT)する例についても見てみましょう。複数のSELECTを非同期に実行する場合には、以下のように実行します。

import asyncio
from pprint import pprint

from db_connect_asyncpg import DbConnectAsyncPostgres


async def select_data(target_id):
    """データを検索する"""
    async with DbConnectAsyncPostgres() as db:
        select_sql = "SELECT * FROM work.sample_table WHERE id = $1"
        result = await db.execute_query(select_sql, (target_id,))

        return result


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [
        select_data(1),
        select_data(2),
        select_data(3),
    ]
    results = await asyncio.gather(*tasks)
    pprint(results)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())
【実行結果】
[[<Record id=1 str1='test_str0' value1=0 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 665664)>],
 [<Record id=2 str1='test_str1' value1=10 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 840874)>],
 [<Record id=3 str1='test_str3' value1=30 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 966835)>]]

SELECTのための関数として、select_dataという非同期関数を用意しています。これは特定のidのデータを検索する関数です。この関数ではexecute_queryにSQLとバインド変数として受け取ったidを渡してSELECTを実行し、実行結果を返却します。

非同期タスクの実行についても、これまでと同様でタスクのリストを用意し、asyncio.gatherに渡します。今回はid=1id=2id=3の場合のSELECTを非同期に実行しています。asyncio.gatherは結果をまとめてくれます。

今回のSELECTの例は非同期で実行する必要もないようなものかもしれませんが、非同期での検索の使い方を説明するためということで理解してください。より複雑で負荷の高いSELECTの場合、非同期で実行して最後に結果をまとめることで効率的に検索ができる可能性があります。

まとめ

Pythonで、asyncpgを用いて非同期にPostgreSQLデータベースへアクセスする方法を解説しました。

asyncpgとは、PostgreSQLデータベースとの非同期通信をサポートするPythonのサードパーティ製ライブラリです。asyncpgは高速であることで知られており、今回はasyncpgを使って非同期にPostgreSQLへアクセスするためのクラスを作成してみました。また、作成したクラスの使い方についても例を使って紹介しています。

非同期プログラミングは、I/Oバウンドな処理を実装するためには非常に有益な実装方法です。DBアクセスについても適切に使用することができれば、性能向上できる可能性が高くなります。ぜひ、asyncpgについて使い方を理解していただき、非同期処理をしてみてもらいたいと思います。