asyncpg

【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)

【Python】asyncpgを用いた非同期データベース処理 (PostgreSQL)
naoki-hn

Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説します。

asyncpg による非同期データベース処理 (PostgreSQL)

非同期処理とは、複数のタスクが協調しあって処理を実行するプログラミングの実装方法です。非同期プログラミングは多くの支持を得ており、Python 3.5 でも asyncawait といった非同期プログラミング方法が導入されています。

async / await を用いたプログラミングの基本は「async/awaitを用いた非同期プログラミングの基本」でまとめているので参考にしてください。

この記事では、非同期処理での効果が期待できる DB アクセスに焦点を当てます。具体的には、asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を紹介します。

asyncpg とは

asyncpg は、PostgreSQL データベースの非同期処理をサポートする Python のサードパーティ製ライブラリです。公式ドキュメントはこちらを参照してください。

非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。I/O バウンドな処理は、ファイル入出力やネットワーク通信等が該当します。

PostgreSQL にアクセスする際には同期的なパッケージとして psycopg2 がよく使用されます。また、PostgreSQL 用の非同期ライブラリとしては他にも aiopg というパッケージもあります。aiopgpsycopg2 をベースにしているため、psycopg2 を利用している人には適しているかもしれません。一方で、asyncpg は高速であることで知られています。

今回は asyncpg の使い方について説明していきたいと思います。

psycopg2 の使い方は「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」を参考にしてください。

asyncpg を用いた非同期通信の基本

この記事では「psycopg2を用いたPostgreSQLデータベースへのアクセス方法」で紹介した PostgreSQL データベースへのアクセスクラスの非同期版クラスを作成してみたいと思います。

asyncpg のインストール

asyncpg はサードパーティ製ライブラリのため、インストールが必要です。以下のように pip でインストールをしてください。

pip install asyncpg

インストールが完了したら非同期に PostgreSQL を扱うことが可能です。

サンプルプログラムの前提条件

この記事では PostgreSQL のインストール自体については説明を省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。

  1. PostgreSQL がローカル PC にインストールされているものとします。
  2. DB「test」、スキーマ「work」が作成されているものとします。
  3. ユーザー「test」、パスワードは「PAssw0rd」とし、work スキーマに対して適切に権限が設定されているものとします。
    GRANT ALL ON SCHEMA work TO test;

以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正して動作させることも可能です。

注意点

asyncpg の使用方法に焦点を当てるため細かな例外処理を省略します。実アプリケーションでの利用では、例外処理を十分検討してください。

asyncpg を用いた PostgreSQL アクセスクラス

asyncpg を用いて PostgreSQL データベースへアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明していきます。

dbconfig.iniDB アクセス設定ファイル

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
min_size=1
max_size=10

db_connect_asyncpg.py非同期 DB アクセス用クラス

import configparser

import asyncpg


class DbConnectAsyncPostgres:
    # 接続プール
    connection_pool = None

    @classmethod
    async def initialize_connection_pool(
        cls, config_file="./dbconfig.ini"
    ) -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            min_size = int(config_db["POOL"]["min_size"])
            max_size = int(config_db["POOL"]["max_size"])

            # DSN(Data Source Name)の作成
            dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
            # 非同期接続プールの初期化
            cls.connection_pool = await asyncpg.create_pool(
                dsn=dsn,
                min_size=min_size,
                max_size=max_size,
            )

    @classmethod
    async def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            await cls.connection_pool.close()
            cls.connection_pool = None

    def __init__(self) -> None:
        """コンストラクタ"""
        self.conn = None
        self.transaction = None

    async def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールから非同期でコネクションを取得
        self.conn = await self.connection_pool.acquire()

    async def close(self) -> None:
        """DBクローズ"""
        if self.conn:
            # 接続プールに返却する
            await self.connection_pool.release(self.conn)
            self.conn = None

    async def __aenter__(self):
        # DB接続
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # DBクローズ
        await self.close()

    async def execute_non_query(
        self, sql: str, bind_var: tuple = None
    ) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
        await self.conn.execute(sql, *(bind_var or ()))

    async def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[asyncpg.Record]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数 (0は全ての行を取得)

        Returns:
            結果リスト (asyncpg.Record型)
        """
        stmt = await self.conn.prepare(sql)

        # データ取得件数が指定されている場合
        if count > 0:
            rows = await stmt.fetch(count, *(bind_var or ()))
        else:
            rows = await stmt.fetch(*(bind_var or ()))

        return rows

    async def start_transaction(self) -> None:
        """トランザクションの開始"""
        # コネクションでトランザクションを開始する
        self.transaction = self.conn.transaction()
        # トランザクションを開始する
        await self.transaction.start()

    async def commit_transaction(self) -> None:
        """トランザクションのコミット"""
        await self.transaction.commit()

    async def rollback_transaction(self) -> None:
        """トランザクションのロールバック"""
        await self.transaction.rollback()

サンプルコードの詳細説明

DB 接続設定ファイル dbconfig.ini

データベースへアクセスする際には、接続のための設定情報が必要です。DB の設定情報は、DB アクセス設定ファイルとして dbconfig.ini として用意します。

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
min_size=1
max_size=10

設定ファイルには以下の情報を記載しています。

設定値概要
host具体的なサーバーのホスト名や IP アドレスを記載します。今回はローカル PC を使うため localhost としています。
portPostgreSQL へアクセスするためのポートです。PostgreSQL ではデフォルトで 5432 が使用されます。お使いの環境で変更している場合は修正が必要です。
dbnameアクセスするデータベース名を記載します。
userアクセスするユーザーを記載します。
passwordアクセス時のパスワードを記載します。

以下の項目については、データベースへの接続プールに関する設定項目です。

設定値概要
min_size接続プールの最小コネクション数を表します。
max_size接続プールの最大コネクション数を表します。
注意点

簡単のためパスワードは平文のまま記載していますが、セキュリティ上のリスクがあります。実際には暗号化等の対策を検討してください。

非同期DBアクセス用クラス DbConnectAsyncPostgres

DB アクセス用のクラスとして DbConnectAsyncPostgres クラスを定義しています。このクラスを DB を使用するプログラムから呼び出すことで非同期に PostgreSQL へアクセスできます。また、このクラスは接続プールやコンテキストマネージャーに対応しています。

定義メソッドの概要は以下になります。

メソッド名概要
initialize_connection_pool接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。
このメソッドは classmethod として定義し、アプリケーション起動時に呼び出します。
close_connection_pool接続プールをクローズするメソッドです。
このメソッドは classmethod として定義し、アプリケーション終了時に呼び出します。
__init__DbConnectAsyncPostgres クラスのコンストラクタです。
connect接続プールから、非同期にコネクションを取得します。
closeデータベースのクローズ処理を行います。コネクションは、接続プールへ返却します。
__aenter__with 句を実行する際に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。
connect メソッドを呼び出すことで DB 接続を行い、戻り値を as で指定した変数に反映します。
__aexit__with 句を抜ける時に呼び出される非同期に対応したコンテキストマネージャーのメソッドです。例外が発生した場合には、exc_typeexc_valexc_tb に例外情報が渡されます。
例外発生時はロールバックし、例外がなければコミットしたうえで、DB 接続をクローズします。
execute_non_queryCREATE / INSERT / UPDATE / DELETE 処理用の SQL 実行メソッドです。SQL 文と必要に応じてバインド変数を渡して処理を実行します。
execute_querySELECT 処理用の SQL 実行メソッドです。返却値としては辞書のリストを返します。SQL 文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。
start_transactionトランザクションを開始するメソッドです。
commit_transactionコミットを行うメソッドです。
rollback_transactionロールバックを行うメソッドです。

それぞれのメソッドのポイントについて以降で説明します。なお、非同期に実行される関数については「async_def」で定義されることに注意してください。また、各関数内で呼び出す際も、非同期に呼び出す場合には「await」キーワードが必要です。

接続プールの初期化:initialize_connection_pool

initialize_connection_pool は、接続プールの初期化を行います。初期化の際には、設定ファイルから DB の接続情報と接続プールの設定値を取得します。

    # 接続プール
    connection_pool = None

    @classmethod
    async def initialize_connection_pool(
        cls, config_file="./dbconfig.ini"
    ) -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            min_size = int(config_db["POOL"]["min_size"])
            max_size = int(config_db["POOL"]["max_size"])

            # DSN(Data Source Name)の作成
            dsn = f"postgresql://{user}:{password}@{host}:{port}/{dbname}"
            # 非同期接続プールの初期化
            cls.connection_pool = await asyncpg.create_pool(
                dsn=dsn,
                min_size=min_size,
                max_size=max_size,
            )

initialize_connection_pool は、@classmethod でクラスメソッドとして定義し、アプリケーション起動時に 1 度呼び出します。コンストラクタ (__init__) で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。

接続プールを作成するには asyncpg.create_pool を使用します。接続時にはDSN (Data Source Name) を指定しますが postgresql://{user}:{password}@{host}:{port}/{dbname} という形式をとります。今回の場合、具体的には「postgresql://test:PAssw0rd@localhost:5432/test」となります。

min_size は最小接続数、max_size は最大接続数を表し、設定ファイルで min_size=1max_size=10 としているので最大 10 のコネクションを使用可能です。

最大接続数を超えた場合の挙動

asyncpg の接続プールでは、最大コネクションに達した場合に他のコネクションを使用しようとした場合、コネクションが解放されるまで待ちます。psycopg2 の接続プールでは、最大を超えるコネクションを接続しようとするとエラーになるため挙動が違います。

asyncpg は、可能な限り効率的にリソースを利用しようとしますが、psycopg2 ではそのような状況を許容しません。これは、非同期処理の中心に設計された asyncpg と、より伝統的な同期処理を主とする psycopg2 の設計思想に違いがあるためです。

接続プールのクローズ:close_connection_pool

接続プールについては、アプリケーションで不要になったときに close を使用してクローズします。

    @classmethod
    async def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            await cls.connection_pool.close()
            cls.connection_pool = None

接続プールをクローズする際には close メソッドを使用して、すべての接続をクローズします。connection_poolNone を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。

コンストラクタ:__init__

コンストラクタ (__init__) では、DB への接続インスタンスごとにコネクションとトランザクション用の変数を用意しておきます。具体的なコネクション取得は、以降の connect メソッドで行います。

    def __init__(self) -> None:
        """コンストラクタ"""
        self.conn = None
        self.transaction = None

DB 接続:connect

connect メソッドでは、接続プールからコネクションを取得します。

    async def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールから非同期でコネクションを取得
        self.conn = await self.connection_pool.acquire()

コネクションを取得する際には、接続プールから「self.connection_pool.acquire()」というように acquire メソッドを使用してコネクションを取得します。

DB クローズ:close

close メソッドでは、データベースのクローズ処理を行います。

    async def close(self) -> None:
        """DBクローズ"""
        if self.conn:
            # 接続プールに返却する
            await self.connection_pool.release(self.conn)
            self.conn = None

コネクションについては、接続プールに返却するために release メソッドにコネクションを渡します。connNone を設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すために None を設定しています。

コンテキストマネージャ:__aenter__ 及び __aexit__

DbConnectAsyncPostgres クラスは、with 句を使用できるようにコンテキストマネージャーに対応して実装します。コンテキストマネージャーに対応するためには、具体的には __aenter__ メソッドと __aexit__ メソッドの実装が必要です。通常のコンテキストマネージャのメソッドとは違い「a」がついた非同期用のメソッドになっている違いがあることに注意してください。

    async def __aenter__(self):
        # DB接続
        await self.connect()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):        
        # DBクローズ
        await self.close()

__aenter__ メソッドでは、DB 接続を行います。これにより with 句に入った際に DB 接続を行い、as で指定された変数に DB クラスのインスタンスを設定します。

with 句を抜ける際には __aexit__ メソッドが呼び出されます。この際に close メソッドでデータベースのクローズ処理を行います。

asyncpg では、自動コミットモードを採用しています。そのため、SQL を実行したタイミングで自動でコミットされます。これは 1 つの SQL をトランザクションと考えていることに相当します。

もし、複数の SQL で 1 つのトランザクションとしてとらえたい場合は、以降で説明する start_transactioncommit_transactionrollback_transaction を明示的に使用する必要があります。

なお、同期的なパッケージである psycopg2 では、自動コミットモードはデフォルトで OFF になっているため、明示的にコミットが必要です。各パッケージで考え方が違うことを覚えておいてください。

コンテキストマネージャーの基本については「コンテキストマネージャーの基本」を参考にしてください

SQL実行:execute_non_query

SQL 実行用のメソッドとして execute_non_query を用意しています。後述する execute_query との違いは返却値がないことで CREATE / INSERT / UPDATE / DELETE などの SQL を実行するために使用します。

    async def execute_non_query(
        self, sql: str, bind_var: tuple = None
    ) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行 (bind_varがNoneの場合は()を設定し、アンパックして渡す)
        await self.conn.execute(sql, *(bind_var or ()))

SQL を実行するためにはコネクションの execute メソッドに対象 SQL を渡します。また、バインド変数がある場合には引数に指定して実行することができます。

バインド変数は、アンパックして渡す必要がありますが or を使用して簡潔に記載しています。この記載方法では bind_varNone の場合は () が渡されます。

SQL実行:execute_query

SQL 実行用のメソッドとしてもう 1 つ execute_query を用意しています。上記の execute_non_query との違いは返却値があることです。SELECT してデータを取得する SQL を実行するために使用します。

    async def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[asyncpg.Record]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数 (0は全ての行を取得)

        Returns:
            結果リスト (asyncpg.Record型)
        """
        stmt = await self.conn.prepare(sql)

        # データ取得件数が指定されている場合
        if count > 0:
            rows = await stmt.fetch(count, *(bind_var or ()))
        else:
            rows = await stmt.fetch(*(bind_var or ()))

        return rows

SQL を実行するためには、コネクションの fetch メソッドに対象となる SQL を渡しますが、その前に prepare メソッドで SQL を stmt として準備します。

SELECT 結果を全て取得する場合は stmt.fetch にバインド変数のみ渡して呼び出します。count が指定されている (>0) 場合には、count を引数に指定することでその件数を取得することができます。

なお、返却の rowsasyncpg.Record 型のリストとなります。必要に応じて結果を dict() で Python の辞書に変換することも可能です。

トランザクションの開始:start_transaction

asyncpg では、自動コミットモードを採用しています。そのため、SQL を実行したタイミングで自動でコミットされますが、複数 SQL で 1 つのトランザクションととらえたい場合があります。このような場合は、以下の start_transaction を使用します。

    async def start_transaction(self) -> None:
        """トランザクションの開始"""
        # コネクションでトランザクションを開始する
        self.transaction = self.conn.transaction()
        # トランザクションを開始する
        await self.transaction.start()

コネクションからトランザクションを開始するには、コネクションの transaction メソッドにより接続からトランザクションを用意し start メソッドでトランザクションを開始します。

コミット:commit_transaction

start_transaction によりトランザクションを開始した場合には、適切な位置でコミットを明示的に行う必要があります。この場合は commit_transaction を使用します。

    async def commit_transaction(self) -> None:
        """トランザクションのコミット"""
        await self.transaction.commit()

内部的にはトランザクションの commit メソッドを呼び出しています。コミット時に他の処理が必要な場合は追加してください。

ロールバック:rollback_transaction

コミット前のデータベースの変更をトランザクション開始時点まで戻したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。この場合には rollback_transaction を使用します。

    async def rollback_transaction(self) -> None:
        """トランザクションのロールバック"""
        await self.transaction.rollback()

内部的にはトランザクションの rollback メソッドを呼び出しています。ロールバック時に他の処理が必要な場合は追加してください。

DB アクセスクラスの使い方

上記で定義した DbConnectAsyncPostgres クラスは、他のプログラムから使用することが可能です。以降では、DbConnectPostgres クラスを使用して、データの追加 (INSERT)、データ検索 (SELECT) の使い方を紹介します。

基本的な使い方 (自動コミットモードを使用)

データを INSERT する

データを非同期に登録 (INSERT) したい場合には、以下のようにします。

import asyncio

from db_connect_asyncpg import DbConnectAsyncPostgres


async def insert_data(value1, value2):
    """データをインサートする (自動コミットモードを使用する場合)"""
    async with DbConnectAsyncPostgres() as db:
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES(\$1, \$2, current_timestamp)"
        )
        await db.execute_non_query(insert_sql, (value1, value2))


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
    await asyncio.gather(*tasks)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())

以降でポイントを説明していきます。

モジュールのインポート

DB アクセスクラスを使うには、作成モジュールを以下のようにインポートします。

from db_connect_asyncpg import DbConnectAsyncPostgres

接続プールの初期化とクローズ

今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。

    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    ...(省略)...

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()

接続プールは、プールからコネクションを取得して使う考え方であるため、アプリケーション起動時に 1 度作成し、終了時にクローズします。

非同期 INSERT 関数

非同期な INSERT 用関数として insert_data 関数を定義しています。

async def insert_data(value1, value2):
    """データをインサートする (自動コミットモードを使用する場合)"""
    async with DbConnectAsyncPostgres() as db:
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES(\$1, \$2, current_timestamp)"
        )
        await db.execute_non_query(insert_sql, (value1, value2))

今回作成したクラスは、コンテキストマネージャーを使用できるため、with 句を使ってインスタンスを作成することができます。

with 句に入った際に connect メソッドにより接続プールからコネクションが取得され with 句を抜ける際にコネクションは返却されます。asyncpg は、自動コミットモードとなっているため execute_non_query が実行されたタイミングでコミットされます。

また、SQL インジェクションを防ぐ手段の一つであるプレースホルダーですが、asyncpg では値を埋め込むための SQL のプレースホルダーとして $1$2 といった表記を使用し、引数に渡すタプルの値が順に設定されます。psycopg2 では %s といった表記であり少し異なっています。


非同期タスクの作成と実行

メインコルーチン (main) では、接続プールを生成した後にタスクを作成しています。

    # 非同期タスクを並行実行
    tasks = [insert_data(f"test_str{i}", 10 * i) for i in range(10)]
    await asyncio.gather(*tasks)

タスクは、リスト内包表記を使って 10 個のタスクを用意しています。このリストを非同期に実行する際に asyncio.gather を使用します。これにより、各タスクは非同期に実行されます。

トランザクションを明確に管理する使い方

asyncpg は、自動コミットモードを採用しているため、SQL を実行したタイミングで自動的にコミットまたはロールバックが行われます。これは 1 つの SQL を 1 トランザクションととらえていることになります。

しかし、実際には複数 SQL を 1 トランザクションとして考えたい場合もあります。そのような場合は、以下のように明示的にトランザクションの開始やコミットを実行することができます。

import asyncio

from db_connect_asyncpg import DbConnectAsyncPostgres


async def insert_data_with_transaction(value1, value2):
    """データをインサートする (トランザクションを明示的に管理する場合)"""

    async with DbConnectAsyncPostgres() as db:
        try:
            # トランザクションの開始
            await db.start_transaction()

            # インサートを実行
            insert_sql = (
                "INSERT INTO work.sample_table "
                "(str1, value1, last_update_datetime) "
                "VALUES(\$1, \$2, current_timestamp)"
            )
            await db.execute_non_query(insert_sql, (value1, value2))

            # コミット
            await db.commit_transaction()
        except Exception as ex:
            await db.rollback_transaction()
            print(f"エラー発生: {ex}")


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [
        insert_data_with_transaction(f"test_str{i}", 10 * i) for i in range(10)
    ]
    await asyncio.gather(*tasks)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())

基本的な使い方は先に説明したプログラムと同様ですが async with のブロックの中で明示的に start_transaction によりトランザクションを開始してます。

トランザクションを明示的に開始した場合には execute_non_query を実行したタイミングではコミットは行われません。そのため、明示的に commit_transaction でコミットを行う必要があります。また、try ... except ... を用いることで例外をキャッチした場合には rollback_transaction によりロールバックする構成となっています。

このように明示的にトランザクションの範囲を制御することで、複数 SQL の実行を 1 トランザクションとして使うことも可能になります。

今回は、INSERT の例を紹介しましたが UPDATEDELETE についても同様の考え方で使用することが可能です。

データを検索する

今回作成したクラスでデータを検索 (SELECT) する例についても見てみましょう。複数の SELECT を非同期に実行する場合には、以下のように実行します。

import asyncio
from pprint import pprint

from db_connect_asyncpg import DbConnectAsyncPostgres


async def select_data(target_id):
    """データを検索する"""
    async with DbConnectAsyncPostgres() as db:
        select_sql = "SELECT * FROM work.sample_table WHERE id = $1"
        result = await db.execute_query(select_sql, (target_id,))

        return result


async def main():
    # 接続プールを初期化する
    await DbConnectAsyncPostgres.initialize_connection_pool()

    # 非同期タスクを並行実行
    tasks = [
        select_data(1),
        select_data(2),
        select_data(3),
    ]
    results = await asyncio.gather(*tasks)
    pprint(results)

    # 接続プールをクローズする
    await DbConnectAsyncPostgres.close_connection_pool()


if __name__ == "__main__":
    # asyncio.runを使用してメインコルーチンを実行
    asyncio.run(main())
【実行結果】
[[<Record id=1 str1='test_str0' value1=0 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 665664)>],
 [<Record id=2 str1='test_str1' value1=10 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 840874)>],
 [<Record id=3 str1='test_str3' value1=30 last_update_datetime=datetime.datetime(2024, 3, 22, 6, 47, 5, 966835)>]]

SELECT のための関数として select_data という非同期関数を用意しています。これは特定の id のデータを検索する関数です。この関数では execute_query に SQL とバインド変数として受け取った id を渡して SELECT を実行し、実行結果を返却します。

非同期タスクの実行についても、これまでと同様でタスクのリストを用意し asyncio.gather に渡します。今回は id=1id=2id=3 の場合の SELECT を非同期に実行しています。asyncio.gather は結果をまとめてくれます。

今回の SELECT の例は非同期で実行する必要もないようなものかもしれませんが、非同期での検索の使い方を説明として紹介しています。より複雑で負荷の高い SELECT の場合、非同期で実行し、最後に結果をまとめることで効率的に検索ができます。

まとめ

Python で asyncpg を用いて非同期に PostgreSQL データベースへアクセスする方法を解説しました。

asyncpg は、PostgreSQL データベースとの非同期通信をサポートする Python のサードパーティ製ライブラリです。asyncpg は高速であることで知られており、今回は asyncpg を使って非同期に PostgreSQL へアクセスするためのクラスを作成してみました。また、作成したクラスの使い方についても例を使って紹介しています。

非同期プログラミングは、I/O バウンドな処理を実装するためには非常に有益な実装方法です。DB アクセスについても適切に使用することができれば、性能向上できる可能性が高くなります。ぜひ、asyncpg について使い方を理解していただき、非同期処理をしてみてもらいたいと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド
ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました