psycopg2

【Python】psycopg2を用いたPostgreSQLデータベースへのアクセス方法

【Python】psycopg2を用いたPostgreSQLデータベースへのアクセス方法

Pythonでpsycopg2を用いてPostgreSQLデータベースへアクセスする方法について解説します。

PythonでPostgreSQLにアクセスする

pythonでPostgreSQLのデータベースへアクセスする方法として、psycopg2モジュールを使用する方法があります。

本記事では、psycopg2を用いたPostgreSQLデータベースへのアクセス用クラスを作って使ってみます。

Note

psycopg2の公式ドキュメントページはこちらを参考にしてください。

psycopg2のインストール

psycopg2を使用するには、以下のコマンドを用いてpipでインストールを行ってください。

pip install psycopg2

pipインストールが完了すれば、psycopg2を用いてPostgreSQLへアクセスするための準備ができます。以降では、PostgreSQLのデータベースへアクセスするためのクラスを作って、PostgreSQLデータベースへアクセスする方法について紹介していきます。

psycopg2を用いたPostgreSQLデータベースへのアクセス

以降では、psycopg2を使用してPostgreSQLデータベースへアクセスするためのプログラムを紹介していきます。

サンプルプログラムの前提条件

以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。

  • PostgreSQLがローカルPCにインストールされているものとします。
  • DB「test」、スキーマ「work」が作成されているものとします。
  • ユーザーとして「test」が作成されており、パスワードは「PAssw0rd」とします。
  • testユーザーは、workスキーマに対して以下のように変更できる権限を設定されているものとします。
    GRANT ALL ON SCHEMA work TO test;
  • dbconfig.ini(詳細は後述)は、Pythonプログラムと同フォルダに配置するものとします。

以降で設定ファイルの記載も含めて紹介していますので、各環境にあわせて設定ファイルやプログラムを少し変更してもらえば異なる環境で動かすことも可能です。コードをそのまま実行する場合の前提条件ということでご理解ください。

psycopg2を用いたPostgreSQLデータベースアクセスのためのサンプルプログラム

psycopg2を用いてPostgreSQLデータベースへアクセスするための設定ファイルおよびサンプルプログラム、実行結果は以下のようになります。

前述した前提に従った環境であれば、このままコピーしていただくだけでも使用することができるかと思います。それぞれの具体的な詳細については次節で説明します。

【dbconfig.ini】DBアクセス設定ファイル

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd

【db_connect_postgresql.py】DBアクセス用クラスと使い方例を含むプログラム

import psycopg2
import psycopg2.extras
import configparser


class DbConnectPostgres:
    def __init__(self) -> None:
        """コンストラクタ
        :return: None
        """
        # コンフィグファイルからデータを取得
        config_db = configparser.ConfigParser()
        config_db.read("./dbconfig.ini")

        # 接続情報の取得
        host = config_db["POSTGRESQL_DB_SERVER"]["host"]
        port = config_db["POSTGRESQL_DB_SERVER"]["port"]
        dbname = config_db["POSTGRESQL_DB_SERVER"]["dbname"]
        user = config_db["POSTGRESQL_DB_SERVER"]["user"]
        password = config_db["POSTGRESQL_DB_SERVER"]["password"]

        # コネクションを確立する
        self.con = psycopg2.connect(
            host=host, port=port, dbname=dbname, user=user, password=password
        )
        self.con.set_client_encoding("utf-8")

        # カーソルを作成する
        self.cursor = self.con.cursor(cursor_factory=psycopg2.extras.DictCursor)

    def execute_non_query(self, sql: str, bind_var: tuple = None) -> None:
        """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド
        :param sql: 実行SQL
        :param bind_var: バインド変数
        :return: None
        """
        # SQLの実行
        if bind_var is None:
            self.cursor.execute(sql)
        else:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)

    def execute_query(self, sql: str, bind_var: tuple = None, count: int = 0) -> list:
        """SELECTのSQL実行メソッド
        :param sql: 実行SQL
        :param bind_var: バインド変数
        :param count: データ取得件数
        :return: 結果リスト
        """
        # SQLの実行
        if bind_var is None:
            self.cursor.execute(sql)
        else:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)

        result = []
        if count == 0:
            rows = self.cursor.fetchall()
            for row in rows:
                result.append(dict(row))
        else:
            # 件数指定がある場合はその件数分を取得する
            rows = self.cursor.fetchmany(count)
            for row in rows:
                result.append(dict(row))

        return result

    def commit(self) -> None:
        """コミット
        :return: None
        """
        self.con.commit()

    def rollback(self) -> None:
        """ロールバック
        :return: None
        """
        self.con.rollback()

    def __del__(self) -> None:
        """デストラクタ
        :return: None
        """
        self.cursor.close()
        self.con.close()


def main():
    db_postgre = DbConnectPostgres()

    # ===== テーブルを作成する
    create_sql = (
        "CREATE TABLE IF NOT EXISTS work.sample_table"
        "("
        "id serial NOT NULL, "
        "str1 character varying(50), "
        "value1 integer, "
        "last_update_datetime timestamp, "
        "PRIMARY KEY (id)"
        ")"
    )
    db_postgre.execute_non_query(create_sql)
    # コミット
    db_postgre.commit()

    # ===== データをINSERTする
    insert_sql = (
        "INSERT INTO work.sample_table "
        "(str1, value1, last_update_datetime) "
        "VALUES(%s, %s, current_timestamp)"
    )
    for i in range(5):
        input_str = f"test_str{i}"
        input_val = 10 * i
        bind = (input_str, input_val)
        db_postgre.execute_non_query(insert_sql, bind)
    # コミット
    db_postgre.commit()

    # ===== データを検索する (全て検索)
    print("===== 全件検索")
    select_sql = "SELECT * FROM work.sample_table"
    select_result = db_postgre.execute_query(select_sql)
    print(select_result)

    # ===== 件数を指定して検索する (以下の例は3件)
    print("===== 取得する件数の指定")
    select_sql = "SELECT * FROM work.sample_table"
    select_result = db_postgre.execute_query(select_sql, count=3)
    print(select_result)

    # ===== 条件を指定して検索する
    print("===== 条件を指定して実行する")
    select_sql = "SELECT * FROM work.sample_table WHERE value1 > %s"
    print("条件1")
    select_result = db_postgre.execute_query(select_sql, (10,))
    print(select_result)
    print("条件2")
    select_result = db_postgre.execute_query(select_sql, (30,))
    print(select_result)


if __name__ == "__main__":
    main()
【実行結果】
===== 全件検索
[{'id': 1, 'str1': 'test_str0', 'value1': 0, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 2, 'str1': 'test_str1', 'value1': 10, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 4, 'str1': 'test_str3', 'value1': 30, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}]
===== 取得する件数の指定
[{'id': 1, 'str1': 'test_str0', 'value1': 0, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 2, 'str1': 'test_str1', 'value1': 10, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}]
===== 条件を指定して実行する
条件1
[{'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 4, 'str1': 'test_str3', 'value1': 30, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}, {'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}]
条件2
[{'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 10, 19, 50, 0, 18996)}]

細かな例外処理はしていませんのでご注意ください。プログラムを参考にして使用いただいても構いませんが、ご自身の責任にてお願いします。

サンプルコードの詳細説明

DB接続設定ファイル dbconfig.ini

まずは、DB接続設定ファイルをdbconfig.iniとして用意します。ファイル名は呼び出し時の指定と揃えていただければ、任意につけていただいて構いません。以降で説明するDBアクセスクラスのプログラム側ではconfigparserを用いて、この設定ファイルの内容を取得して使用します。

【dbconfig.ini】DBアクセス設定ファイル(再掲)

[POSTGRESQL_DB_SERVER]
host = localhost
port = 5432
dbname = test
user = test
password = PAssw0rd

具体的には、host(ホスト)、port(ポート)、dbname(データベース名)、user(ユーザー名)、password(パスワード)を記載しています。

hostについては、具体的なサーバーのホスト名やIPアドレスでも構いません。今回はローカルPCのDBを使うためlocalhostとしています。portについては、PostgreSQLでデフォルトで使用されるポートですが変更している場合は修正が必要です。dbname、user、passwordは環境にあわせて指定します。

設定ファイルにせずに上記情報をプログラムに直接記載しても問題ないのですが、メンテナンスしづらいので設定ファイルとして切り出しています。また、パスワードをそのまま平文で記載しておくのはよろしくありませんが、今回は簡単にするためにそのままとしています。

DBアクセス用クラス DbConnectPostgres

DBアクセス用のクラスとしてDbConnectPostgresというクラスを定義しています。

メソッドとして定義しているものは以下になります。

メソッド名概要
__init__コンストラクタ。処理内で設定ファイルから接続情報を読み出し、DBへのコネクション確立とカーソル作成を行います。
execute_non_queryCREATE/INSERT/UPDATE/DELETEの処理用のSQL実行メソッド。SQL文と必要に応じてバインド変数を渡して処理を実行します。
execute_querySELECTの処理用のSQL実行メソッド。返却値は辞書のリストとなる。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。
commitコミットして変更内容を確定します。
rollbackロールバックして変更内容を破棄します。
__del__デストラクタ。使われなくなったタイミングで、データベースのカーソルとコネクションをクローズします。

それぞれのメソッドのポイントとなる内容について以降で解説していきます。

コンストラクタ:__init__

コンストラクタでは、DBへの接続を確立します。まずは、以下の部分でコンフィグファイルから設定を取得してきます。

        # コンフィグファイルからデータを取得
        config_db = configparser.ConfigParser()
        config_db.read('dbconfig.ini')

        # 接続情報の取得
        host = config_db['POSTGRESQL_DB_SERVER']['host']
        port = config_db['POSTGRESQL_DB_SERVER']['port']
        dbname = config_db['POSTGRESQL_DB_SERVER']['dbname']
        user = config_db['POSTGRESQL_DB_SERVER']['user']
        password = config_db['POSTGRESQL_DB_SERVER']['password']

上記部分により、hostなどの情報を設定ファイルから取得できます。その後、以下の部分で接続のためのコネクション確立とDB操作のためのカーソルを用意しています。

        # コネクションを確立する
        self.con = psycopg2.connect(
            host=host,
            port=port,
            dbname=dbname,
            user=user,
            password=password
        )
        self.con.set_client_encoding('utf-8')

        # カーソルを作成する
        self.cursor = self.con.cursor(
            cursor_factory=psycopg2.extras.DictCursor
        )

connectメソッドに設定ファイルから取得した各種情報を設定することで、コネクションが確立できます。また、カーソルはコネクションのcursorメソッドで作成できます。

カーソルは辞書型で扱えるようにextras.DictCursorを設定をしています。

SQL実行:execute_non_query

SQL実行用のメソッドとしてexecute_non_queryを用意しています。後述するexecute_queryとの違いは返却値がないことで、CREATE/INSERT/UPDATE/DELETEなどのSQLを実行する用として作成しています。

    def execute_non_query(self, sql: str, bind_var: tuple = None) -> None:
        """ CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド
        :param sql: 実行SQL
        :param bind_var: バインド変数
        :return: None
        """
        # SQLの実行
        if bind_var is None:
            self.cursor.execute(sql)
        else:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)

SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。

SQL実行:execute_query

SQL実行用のメソッドとしてもう一つexecute_queryを用意しています。上記のexecute_non_queryとの違いは、返却値があることでSELECTしてデータを取得する用として作成しています。

    def execute_query(self,
                      sql: str,
                      bind_var: tuple = None,
                      count: int = 0) -> list:
        """SELECTのSQL実行メソッド
        :param sql: 実行SQL
        :param bind_var: バインド変数
        :param count: データ取得件数
        :return: 結果リスト
        """
        # SQLの実行
        if bind_var is None:
            self.cursor.execute(sql)
        else:
            # バインド変数がある場合は指定して実行
            self.cursor.execute(sql, bind_var)

        result = []
        if count == 0:
            rows = self.cursor.fetchall()
            for row in rows:
                result.append(dict(row))
        else:
            # 件数指定がある場合はその件数分を取得する
            rows = self.cursor.fetchmany(count)
            for row in rows:
                result.append(dict(row))

        return result

SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。

SELECT結果を全て取得する場合はfetchallメソッドで取得することができます。また、件数を指定する場合には、fetchmanyメソッドで取得件数を指定すると指定件数分のみ取得できます。また、今回は使用していませんが1件取得するfetchoneメソッドもあります。

返却値は'psycopg2.extras.DictRow'というクラスのインスタンスのリストとなっているので、for文で順に要素を取り出し結果リストにappendで追加しています。この際にdict()に渡していますが、これにより'psycopg2.extras.DictRow'型を辞書に変換しています。

コミット:commit

データベースへの変更を確定させるためにはコミットをする必要があります。そのためのメソッドとしてcommitメソッドを用意しています。

    def commit(self) -> None:
        """コミット
        :return: None
        """
        self.con.commit()

コネクションにはcommitメソッドが用意されているため、実行することでコミットすることが可能です。

ロールバック:rollback

コミット前のデータベースへの変更を取り消したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。そのためのメソッドとしてrollbackメソッドを用意しています。

    def rollback(self) -> None:
        """ロールバック
        :return: None
        """
        self.con.rollback()

コネクションにはrollbackメソッドが用意されているため、実行することでロールバックすることが可能です。

デストラクタ:__del__

クラスのインスタンスが使用されなくなったら、DBアクセスのために用意していたカーソルとコネクションを閉じる必要があります。そのため、デストラクタである__del__内でクローズ処理を実施しています。

    def __del__(self) -> None:
        """デストラクタ
        :return: None
        """
        self.cursor.close()
        self.con.close()

カーソル及びコネクションにはcloseメソッドが用意されているため、実行することでクローズ処理をすることができます。

DBアクセス用クラス DbConnectPostgresの使い方
~テーブル作成、データ追加、データ検索例~

main関数では、上記で定義したDbConnectPostgresクラスを使用して、テーブルの作成(CREATE TABLE)、データの追加(INSERT)、データ検索(SELECT)の使い方の例を記載しています。それぞれのパートの内容を順に説明します。

インスタンスの生成

まずは、以下のように作成したDBアクセスクラスをインスタンス化します。

db_postgre = DbConnectPostgres()
テーブルの作成(CREATE TABLE)

CREATE TABLEでテーブル作成しているのが以下の部分です。

    # ===== テーブルを作成する
    create_sql = "CREATE TABLE IF NOT EXISTS work.sample_table" \
                 "(" \
                 "id serial NOT NULL, " \
                 "str1 character varying(50), " \
                 "value1 integer, " \
                 "last_update_datetime timestamp, " \
                 "PRIMARY KEY (id)" \
                 ")"
    db_postgre.execute_non_query(create_sql)
    # コミット
    db_postgre.commit()

上記のようにCREATE用のSQL文を用意して、execute_non_queryメソッドに渡します。処理を確定する際には、commitメソッドを呼び出します。

データの登録(INSERT)

INSERTでデータを登録しているのが以下の部分です。

    # ===== データをINSERTする
    insert_sql = "INSERT INTO work.sample_table " \
                 "(str1, value1, last_update_datetime) " \
                 "VALUES(%s, %s, current_timestamp)"
    for i in range(5):
        input_str = f'test_str{i}'
        input_val = 10 * i
        bind = (input_str, input_val)
        db_postgre.execute_non_query(insert_sql, bind)
    # コミット
    db_postgre.commit()

今回の例では、5件データを追加しています。SQLでは、値を埋め込む部分を%sで記載し、バインド変数を用意して渡すことで入力値を変えつつINSERTしています。処理を確定する際には、commitメソッドを呼び出します。

なお、今回idはserialで用意されているため自動で採番がされます。そのため、このプログラムは実行するたびに5件ずつデータが追加されていきます。

データの検索(SELECT)

SELECTで登録したデータを検索しているのが以下の部分です。今回は3つほど例を用意しています。

    # ===== データを検索する (全て検索)
    print('===== 全件検索')
    select_sql = "SELECT * FROM work.sample_table"
    select_result = db_postgre.execute_query(select_sql)
    print(select_result)

    # ===== 件数を指定して検索する (以下の例は3件)
    print('===== 取得する件数の指定')
    select_sql = "SELECT * FROM work.sample_table"
    select_result = db_postgre.execute_query(select_sql, count=3)
    print(select_result)

    # ===== 条件を指定して検索する
    print('===== 条件を指定して実行する')
    select_sql = "SELECT * FROM work.sample_table WHERE value1 > %s"
    print('条件1')
    select_result = db_postgre.execute_query(select_sql, (10,))
    print(select_result)
    print('条件2')
    select_result = db_postgre.execute_query(select_sql, (30,))
    print(select_result)

全件検索する場合には「SELECT *」で検索すれば情報を取得できます。また、count引数に数字を指定すれば全件のうち指定した件数のみ取得できます。

条件を指定する場合には、WHERE句のあるSQLをそのまま渡してもいいですが、上記の例では条件の値を%sとして、バインド変数で値を変えながら実行しています。これにより条件に一致するデータのみ検索出来ていることが分かるかと思います。

以上が、今回作成してみたDBアクセスクラスの簡単な使い方です。psycopg2を用いたPostgreSQLデータベースへのアクセス方法として参考にしてみていただければと思います。