psycopg2

【Python】psycopg2を用いたPostgreSQLデータベースへのアクセス方法

【Python】psycopg2を用いたPostgreSQLデータベースへのアクセス方法

Pythonでpsycopg2を用いてPostgreSQLデータベースへアクセスする方法について解説します。

psycopg2を用いたPostgreSQLアクセス

PostgreSQLは、オープンソースのリレーショナルデータベース(RDB)です。コミュニティによる強力なサポートと継続的な開発により、企業レベルのアプリケーションを含めて世界中で広く利用されています。信頼性や柔軟性が高く、オープンソースであることも多くの開発者に選ばれている理由です。

PythonでPostgreSQLのデータベースへアクセスするには、psycopg2モジュールが使用できます。psycopg2は、多くのプロジェクトやアプリケーションで使用されていることから実績が広く認められており、デファクトスタンダードの位置づけにあります。

本記事では、psycopg2を用いてPostgreSQLデータベースへアクセスする方法について説明します。具体的には、psycopg2を用いてPostgreSQLへアクセスするためのクラスを作成し、データベースへアクセスするプログラムを作成します。なお、本記事で紹介するクラスは、コンテキストマネージャーや接続プールにも対応します。

psycopg2のインストール

psycopg2を使用するには、以下のコマンドを用いてpipでインストールを行ってください。

pip install psycopg2

pipインストールが完了すれば、psycopg2を用いてPostgreSQLへアクセスするための準備ができます。

サンプルプログラムの前提条件

本記事では、PostgreSQLのインストール自体については説明を省略します。以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。

  1. PostgreSQLがローカルPCにインストールされているものとする。
  2. DB「test」、スキーマ「work」が作成されているものとする。
  3. ユーザー「test」が作成されており、パスワードは「PAssw0rd」とする。また、workスキーマに対して適切に権限が設定されているものとする。(GRANT ALL ON SCHEMA work TO test;

以降では、設定ファイルの記載も含めて紹介しています。ご利用の環境に合わせて設定ファイルやプログラムは修正して動作させることも可能です。

今回は、psycopg2の使用方法に焦点を当てるため細かな例外処理を省略していますのでご注意ください。実際のアプリケーションの利用では、例外処理を十分に考慮する必要があります。

psycopg2を用いたPostgreSQLアクセスクラス

psycopg2を用いてPostgreSQLデータベースへアクセスするためのクラスを作成します。設定ファイルおよびサンプルプログラムをまず紹介します。具体的な詳細については以降で説明していきます。

【dbconfig.ini】DBアクセス設定ファイル

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
minconn = 1
maxconn = 10

【db_connect_postgresql.py】DBアクセス用クラス

import configparser

import psycopg2
import psycopg2.extras
from psycopg2 import pool


class DbConnectPostgres:
    # 接続プール
    connection_pool = None

    @classmethod
    def initialize_connection_pool(cls, config_file="./dbconfig.ini") -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            minconn = int(config_db["POOL"]["minconn"])
            maxconn = int(config_db["POOL"]["maxconn"])

            # 接続プールを初期化
            cls.connection_pool = pool.ThreadedConnectionPool(
                minconn=minconn,
                maxconn=maxconn,
                host=host,
                port=port,
                dbname=dbname,
                user=user,
                password=password,
            )

    @classmethod
    def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            cls.connection_pool.closeall()
            cls.connection_pool = None

    def __init__(self) -> None:
        """コンストラクタ"""
        # コネクションとカーソル
        self.conn = None
        self.cursor = None

    def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールからコネクションを取得
        self.conn = self.connection_pool.getconn()
        # カーソルを取得
        self.cursor = self.conn.cursor(
            cursor_factory=psycopg2.extras.DictCursor
        )

    def close(self) -> None:
        """DBクローズ"""
        if self.cursor:
            # カーソルをクローズする
            self.cursor.close()
            self.cursor = None
        if self.conn:
            # 接続をプールに返却する
            self.connection_pool.putconn(self.conn)
            self.conn = None

    def __enter__(self):
        # DB接続
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            # 例外発生時はロールバック
            self.rollback()
            raise
        else:
            # 例外がなければコミット
            self.commit()

        # DBクローズ
        self.close()

    def execute_non_query(self, sql: str, bind_var: tuple = None) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行
        if bind_var:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)
        else:
            self.cursor.execute(sql)

    def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[psycopg2.extras.DictRow]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数

        Returns:
            結果リスト
        """
        # SQLの実行
        if bind_var:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)
        else:
            self.cursor.execute(sql)

        if count == 0:
            rows = self.cursor.fetchall()
        else:
            # 件数指定がある場合はその件数分を取得する
            rows = self.cursor.fetchmany(count)

        return rows

    def commit(self) -> None:
        """コミット"""
        self.conn.commit()

    def rollback(self) -> None:
        """ロールバック"""
        self.conn.rollback()

サンプルコードの詳細説明

DB接続設定ファイル dbconfig.ini

データベース(DB)へアクセスする際には、接続のための設定情報が必要です。DBの設定情報は、DBアクセス設定ファイルとしてdbconfig.iniとして用意します。

このファイルは、以降で紹介するプログラム内でconfigparserモジュールを用いて内容を取得します。configparserの使用方法については「configparserによる設定ファイル管理」にまとめていますので参考にしてください。

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd
[POOL]
minconn = 1
maxconn = 10

設定ファイルには、以下の情報を記載しています。

設定値概要
host具体的なサーバーのホスト名やIPアドレスを記載します。今回はローカルPCを使うためlocalhostとしています。
portPostgreSQLへアクセスするためのポートです。PostgreSQLではデフォルトで5432が使用されます。お使いの環境で変更している場合は修正が必要です。
dbnameアクセスするデータベース名を記載します。
userアクセスするユーザーを記載します。
passwordアクセス時のパスワードを記載します。

以下の項目については、データベースへの接続プールに関する設定項目です。

設定値概要
minconn接続プールにおける最小コネクション数を表します。
maxconn接続プールにおける最大コネクション数を表します。

今回は簡単のためパスワードは平文のまま記載していますが、セキュリティ上適切ではありません。実際には暗号化等を考慮する必要がある点に注意してください。

DBアクセス用クラス DbConnectPostgres

DBアクセス用のクラスとしてDbConnectPostgresというクラスを定義しています。このクラスをDBを使用するプログラムから呼び出すことでPostgreSQLへ簡単にアクセスできます。

また、コンテキストマネージャーに対応させることでwith句を用いた資源管理ができるようにしています。コンテキストマネージャーについては「コンテキストマネージャーの基本」でまとめているので参考にしてください。

定義メソッドの概要は以下になります。

メソッド名概要
initialize_connection_pool接続プールを作成する初期化メソッドです。設定ファイルから接続状況を読み込み、接続プールを作成します。
このメソッドはclassmethodとして定義し、アプリケーション起動時に呼び出します。
close_connection_pool接続プールをクローズするメソッドです。
このメソッドはclassmethodとして定義し、アプリケーション終了時に呼び出します。
__init__DbConnectPostgresクラスのコンストラクタです。
connect接続プールからコネクションを取得し、カーソルの作成を行います。
closeデータベースのクローズ処理を行います。コネクションは、接続プールへ返却します。
__enter__with句を実行する際に呼び出されるコンテキストマネージャーのメソッドです。
connectメソッドを呼び出すことでDB接続を行い、戻り値をasで指定された変数に反映します。
__exit__with句を抜ける時に呼び出されるコンテキストマネージャーのメソッドです。例外が発生した場合には、exc_typeexc_valexc_tbに例外情報が渡されます。
例外発生時はロールバックし、例外がなければコミットを行ったうえで、DB接続をクローズします。
execute_non_queryCREATE/INSERT/UPDATE/DELETE処理用のSQL実行メソッドです。SQL文と必要に応じてバインド変数を渡して処理を実行します。
execute_querySELECT処理用のSQL実行メソッドです。返却値としては辞書のリストを返します。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。
commitコミットして変更内容を確定します。
rollbackロールバックして変更内容を破棄します。

それぞれのメソッドのポイントについて以降で解説していきます。

接続プールの初期化:initialize_connection_pool

initialize_connection_poolは、接続プールの初期化を行います。初期化の際には、設定ファイルからDBの接続情報と接続プールの設定値を取得します。

    # 接続プール
    connection_pool = None

    @classmethod
    def initialize_connection_pool(cls, config_file="./dbconfig.ini") -> None:
        """接続プールの初期化

        Args:
            config_file: 設定ファイルパス

        Returns:
            None
        """
        if cls.connection_pool is None:
            # コンフィグファイルからデータを取得
            config_db = configparser.ConfigParser()
            config_db.read(config_file)

            # 接続情報の取得
            host = config_db["POSTGRESQL_DB_SERVER"]["host"]
            port = config_db["POSTGRESQL_DB_SERVER"]["port"]
            dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
            user = config_db["POSTGRESQL_DB_SERVER"]["user"]
            password = config_db["POSTGRESQL_DB_SERVER"]["password"]
            # プール設定の取得
            minconn = int(config_db["POOL"]["minconn"])
            maxconn = int(config_db["POOL"]["maxconn"])

            # 接続プールを初期化
            cls.connection_pool = pool.ThreadedConnectionPool(
                minconn=minconn,
                maxconn=maxconn,
                host=host,
                port=port,
                dbname=dbname,
                user=user,
                password=password,
            )

initialize_connection_poolは、@classmethodとしてクラスメソッドとして定義し、アプリケーション起動時に1度呼び出します。コンストラクタ(__init__)で定義しないのはインスタンス化のたびに接続プールを作成する必要はないためです。

接続プールを作成するには「from psycopg2 import pool」というようにインポートが必要であることに注意してください。pool.ThreadConnectionPoolによりマルチスレッドに対応可能な接続プールを生成できます。

なお、minconnは最小接続数、maxconnは最大接続数を表しており、今回の設定ファイルでは、minconn=1maxconn=10としているので、最大10のコネクションを使用することが可能です。

接続プールのクローズ:close_connection_pool

接続プールについては、アプリケーションで不要になったときにclose_connection_poolを使用してクローズします。

    @classmethod
    def close_connection_pool(cls):
        """接続プールをクローズする"""
        if cls.connection_pool:
            cls.connection_pool.closeall()
            cls.connection_pool = None

接続プールをクローズする際にはcloseallメソッドを使用して、すべての接続をクローズします。connection_poolNoneを設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNoneを設定しています。

コンストラクタ:__init__

コンストラクタ(__init__)では、DBへの接続インスタンスごとにコネクションとカーソルを用意しておきます。具体的なコネクション取得やカーソル生成は、以降のconnectメソッドで行います。

    def __init__(self) -> None:
        """コンストラクタ"""
        # コネクションとカーソル
        self.conn = None
        self.cursor = None

DB接続:connect

connectメソッドでは、接続プールからコネクションを取得して、カーソルを用意します。

    def connect(self) -> None:
        """DB接続"""
        if self.connection_pool is None:
            raise Exception("接続プールが初期化されていません")

        # 接続プールからコネクションを取得
        self.conn = self.connection_pool.getconn()
        # カーソルを取得
        self.cursor = self.conn.cursor(
            cursor_factory=psycopg2.extras.DictCursor
        )

接続のためのコネクションは接続プールから取得します。コネクションを取得する際には、接続プールから「self.connection_pool.getconn()」というようにgetconnメソッドを使用してコネクションを取得します。カーソルは、取得したコネクションのcursorメソッドを使用することでカーソルが作成できます。

デフォルトでは、データの返却値はタプルのリストとなります。カーソルを辞書型で扱えるようにするには、cursor_factory引数にpsycopg2.extras.DictCursorを指定することで、データを辞書のリストとして扱うことが可能です。

DBクローズ:close

closeメソッドでは、データベースのクローズ処理を行います。

    def close(self) -> None:
        """DBクローズ"""
        if self.cursor:
            # カーソルをクローズする
            self.cursor.close()
            self.cursor = None
        if self.conn:
            # 接続をプールに返却する
            self.connection_pool.putconn(self.conn)
            self.conn = None

作成したカーソル(cursor)のクローズ処理をcloseメソッドにより行います。コネクションについては、接続プールに返却するためにputconnメソッドにコネクションを渡します。

cursorconnNoneを設定することは必須ではありませんが、ガーベージコレクションの対象となることを示すためにNoneを設定しています。

コンテキストマネージャ:__enter__ 及び __exit__

DbConnectPostgresクラスは、with句を用いて使用できるようにコンテキストマネージャーに対応できるように実装します。コンテキストマネージャーに対応するためには、具体的には、__enter__メソッドと__exit__メソッドの実装が必要です。コンテキストマネージャーについては「コンテキストマネージャーの基本」でまとめているので参考にしてください。

    def __enter__(self):
        # DB接続
        self.connect()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            # 例外発生時はロールバック
            self.rollback()
            raise
        else:
            # 例外がなければコミット
            self.commit()

        # DBクローズ
        self.close()

__enter__メソッドでは、DB接続を行います。これによりwith句に入った際にDB接続を行い、asで指定された変数にDBクラスのインスタンスを設定します。

with句を抜ける際には、__exit__メソッドが呼び出されます。この際、例外が発生している場合には、exc_typeexc_valexc_tbに例外情報が渡されます。例外が発生した場合はロールバックし、再度例外をraiseにより上位へ送っています。例外がなければコミットし、その後にDBの接続を終了します。

このようにコンテキストマネージャーに対応させるとwith句を一つのトランザクションとして考えることもでき、クローズ処理を忘れることを防止できます。

Note

トランザクションの考え方

psycopg2では、SQLを実行したタイミングでトランザクションが開始されますが、自動コミットモードはOFFになっています。そのため、明示的に
commitが実行されるまで変更が反映されません。

今回紹介しているクラスではwithを抜ける際にcommitrollbackが実行されますが、withブロック内の処理の途中で明示的にコミットしたい場合は適宜commitメソッドを呼び出す必要があることに注意してください。

SQL実行:execute_non_query

SQL実行用のメソッドとしてexecute_non_queryを用意しています。後述するexecute_queryとの違いは返却値がないことで、CREATE/INSERT/UPDATE/DELETEなどのSQLを実行するために使用します。

    def execute_non_query(self, sql: str, bind_var: tuple = None) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数

        Returns:
            None
        """
        # SQLの実行
        if bind_var:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)
        else:
            self.cursor.execute(sql)

SQLを実行するためには、カーソルのexecuteメソッドに対象のSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。

SQL実行:execute_query

SQL実行用のメソッドとしてもう一つexecute_queryを用意しています。上記のexecute_non_queryとの違いは、返却値があることです。SELECTしてデータを取得するSQLを実行するために使用します。

    def execute_query(
        self, sql: str, bind_var: tuple = None, count: int = 0
    ) -> list[psycopg2.extras.DictRow]:
        """SELECTのSQL実行メソッド

        Args:
            sql: 実行SQL
            bind_var: バインド変数
            count: データ取得件数

        Returns:
            結果リスト
        """
        # SQLの実行
        if bind_var:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)
        else:
            self.cursor.execute(sql)

        if count == 0:
            rows = self.cursor.fetchall()
        else:
            # 件数指定がある場合はその件数分を取得する
            rows = self.cursor.fetchmany(count)

        return rows

SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。

SELECT結果を全て取得する場合は、fetchallメソッドを使用します。また、件数を指定する場合には、fetchmanyメソッドに取得件数を指定します。なお、今回は使用していませんが1件データを取得するfetchoneメソッドもあります。

返却値はpsycopg2.extras.DictRowのリストとなっています。psycopg2.extras.DictRowは、辞書のようにアクセスできる一方で、タプルのように振舞うことも可能なデータ型です。用途によって辞書として扱いたい場合は、dict()によりPythonの辞書型に変換することも可能です。

コミット:commit

データベースへの変更を確定させるためにはコミットをする必要があります。そのためのメソッドとしてcommitメソッドを用意しています。

    def commit(self) -> None:
        """コミット"""
        self.conn.commit()

内部的には、コネクションのcommitメソッドを呼び出しているだけです。もし、コミット時に他の処理が必要な場合は追加してください。

ロールバック:rollback

コミット前のデータベースへの変更を取り消したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。そのためのメソッドとしてrollbackメソッドを用意しています。

    def rollback(self) -> None:
        """ロールバック"""
        self.conn.rollback()

内部的には、コネクションのrollbackメソッドを呼び出しているだけです。もし、ロールバック時に他の処理が必要な場合は追加してください。

DBアクセス用クラスの使い方

上記で定義したDbConnectPostgresクラスは、他のプログラムから使用することが可能です。

以下のdbaccess_sample.pyは、DbConnectPostgresクラスを使用して、テーブルの作成(CREATE TABLE)、データの追加(INSERT)、データ検索(SELECT)の使い方の例を記載しています。それぞれのパートの内容を順に説明します。

【dbaccess_sample.py】

from db_connect_postgresql import DbConnectPostgres


def main():
    # 接続プールの初期化
    DbConnectPostgres.initialize_connection_pool()

    with DbConnectPostgres() as db:
        # ===== テーブルを作成する
        create_sql = (
            "CREATE TABLE IF NOT EXISTS work.sample_table"
            "("
            "id serial NOT NULL, "
            "str1 character varying(50), "
            "value1 integer, "
            "last_update_datetime timestamp, "
            "PRIMARY KEY (id)"
            ")"
        )
        db.execute_non_query(create_sql)

        # ===== データをINSERTする
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES(%s, %s, current_timestamp)"
        )
        for i in range(5):
            input_str = f"test_str{i}"
            input_val = 10 * i
            bind = (input_str, input_val)
            db.execute_non_query(insert_sql, bind)

        # ===== データを検索する (全て検索)
        print("===== 全件検索")
        select_sql = "SELECT * FROM work.sample_table"
        select_result = db.execute_query(select_sql)
        print(select_result)

        # ===== 件数を指定して検索する (以下の例は3件)
        print("===== 取得する件数の指定")
        select_sql = "SELECT * FROM work.sample_table"
        select_result = db.execute_query(select_sql, count=3)
        print(select_result)

        # ===== 条件を指定して検索する
        print("===== 条件を指定して実行する")
        select_sql = "SELECT * FROM work.sample_table WHERE value1 > %s"
        print("条件1")
        select_result = db.execute_query(select_sql, (10,))
        print(select_result)
        print("条件2")
        select_result = db.execute_query(select_sql, (30,))
        print(select_result)

    # 接続プールのクローズ
    DbConnectPostgres.close_connection_pool()


if __name__ == "__main__":
    main()
【実行結果例】
===== 全件検索
[[1, 'test_str0', 0, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [2, 'test_str1', 10, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [3, 'test_str2', 20, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [4, 'test_str3', 30, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [5, 'test_str4', 40, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)]]
===== 取得する件数の指定
[[1, 'test_str0', 0, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [2, 'test_str1', 10, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [3, 'test_str2', 20, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)]]
===== 条件を指定して実行する
条件1
[[3, 'test_str2', 20, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [4, 'test_str3', 30, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)], [5, 'test_str4', 40, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)]]
条件2
[[5, 'test_str4', 40, datetime.datetime(2024, 3, 17, 18, 39, 38, 987723)]]

モジュールのインポート

DBアクセスクラスを使うには、作成したモジュールを以下のようにインポートしてください。

from db_connect_postgresql import DbConnectPostgres

接続プールの作成とクローズ

今回のクラスでは接続プールを使用していますので、使用前に接続プールを初期化する必要があります。

    # 接続プールの初期化
    DbConnectPostgres.initialize_connection_pool()

    ...(省略)...

    # 接続プールのクローズ
    DbConnectPostgres.close_connection_pool()

上記のようにアプリケーションの最初に接続プールを初期化し、最後には接続プールをクローズするようにしてください。

DBアクセスクラスのインスタンス化

クラスはコンテキストマネージャーを使用できるため、以下のようにwith句を使用してインスタンスを作成することができます。

    with DbConnectPostgres() as db:
        ...(処理)...

with句に入った際に、connectメソッドにより接続プールからコネクションが取得され、with句を抜けた際に、必要に応じてロールバックやコミットが実行されます。これによりwithを一つのトランザクションとして扱うことが可能です。

なお、with句を使用しなくてもインスタンス化は可能ですが、その場合は、接続(connect)やクローズ(close)、コミット(commit)やロールバック(rollback)といった処理は自分で適切に呼び出す必要がありますので注意してください。

テーブルの作成(CREATE TABLE

CREATE TABLEでテーブル作成しているのが以下の部分です。

        # ===== テーブルを作成する
        create_sql = (
            "CREATE TABLE IF NOT EXISTS work.sample_table"
            "("
            "id serial NOT NULL, "
            "str1 character varying(50), "
            "value1 integer, "
            "last_update_datetime timestamp, "
            "PRIMARY KEY (id)"
            ")"
        )
        db.execute_non_query(create_sql)

上記のようにCREATE用のSQL文を用意して、execute_non_queryメソッドに渡すことでテーブルの作成が可能です。

データの登録(INSERT

INSERTでデータを登録しているのが以下の部分です。

        # ===== データをINSERTする
        insert_sql = (
            "INSERT INTO work.sample_table "
            "(str1, value1, last_update_datetime) "
            "VALUES(%s, %s, current_timestamp)"
        )
        for i in range(5):
            input_str = f"test_str{i}"
            input_val = 10 * i
            bind = (input_str, input_val)
            db.execute_non_query(insert_sql, bind)

今回の例では、5件データを追加しています。SQLでは、値を埋め込む部分を%sで記載し、バインド変数を用意して渡すことで入力値を変えつつINSERTしています。

なお、今回idserialで用意されているため自動採番されます。そのため、このプログラムは実行するたびに5件ずつデータが追加されるという動きをします。

データの検索(SELECT

SELECTで登録したデータを検索しているのが以下の部分です。今回は3つほど例を用意しています。

        # ===== データを検索する (全て検索)
        print("===== 全件検索")
        select_sql = "SELECT * FROM work.sample_table"
        select_result = db.execute_query(select_sql)
        print(select_result)

        # ===== 件数を指定して検索する (以下の例は3件)
        print("===== 取得する件数の指定")
        select_sql = "SELECT * FROM work.sample_table"
        select_result = db.execute_query(select_sql, count=3)
        print(select_result)

        # ===== 条件を指定して検索する
        print("===== 条件を指定して実行する")
        select_sql = "SELECT * FROM work.sample_table WHERE value1 > %s"
        print("条件1")
        select_result = db.execute_query(select_sql, (10,))
        print(select_result)
        print("条件2")
        select_result = db.execute_query(select_sql, (30,))
        print(select_result)

全件検索する場合には「SELECT *」で検索すれば情報を取得できます。また、count引数に数字を指定すれば全件のうち指定した件数のみ取得できます。

条件を指定する場合には、WHERE句に具体的に値を設定したSQLをそのまま渡してもいいですが、上記の例では条件の値を%sとして、バインド変数で値を変えながら実行しています。これにより条件に一致するデータのみ検索出来ていることが分かります。

まとめ

Pythonでpsycopg2を用いてPostgreSQLデータベースへアクセスする方法について解説しました。

PostgreSQLは、オープンソースのリレーショナルデータベース(RDB)で、企業レベルのアプリケーションを含めて世界中で広く利用されています。psycopg2モジュールは、PythonでPostgreSQLへアクセスができるもので、多くのプロジェクトやアプリケーションで使用されていることから、デファクトスタンダードの位置づけにあります。

本記事では、PostgreSQLへアクセスするためのクラスを作成し、データベースへアクセスする方法を紹介しました。ぜひ、基本を理解してもらって使いこなしてもらえたらと思います。

Note

psycopg2の公式ドキュメントページはこちらを参考にしてください。