Pythonでcx_Oracleを用いてOracleデータベースへアクセスする方法について解説します。
Contents
PythonでOracleデータベースへアクセスする
業務アプリケーションなどを開発する際には、データベースとしてOracle社のOracleデータベースが使用することが多いかと思います。PythonのプログラムからOracleデータベースへアクセスする方法として、cx_Oracleモジュールを使用する方法があります。
cx_OracleはOracle社が自ら作成しているもので、基本的なSQLアクセスやPL/SQLの実行などの機能を提供しています。
cx_Oracleの公式ドキュメントページはこちらを参考にしてください。
cx_Oracleのインストール
cx_Oracleを使用するには、以下のコマンドを用いてpipでインストールを行ってください。
pip install cx_Oracle
pipインストールが完了すれば、cx_Oracleを用いてOracleデータベースへアクセスするための準備ができます。以降では、cx_Oracleのデータベースへアクセスするためのクラスを作って、Oracleデータベースへアクセスする方法について紹介していきます。
cx_Oracleを用いたOracleデータベースへのアクセス
以降では、cx_Oracleを使用してOracleデータベースへアクセスするためのプログラムを紹介していきます。
サンプルプログラムの前提条件
以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。
- 無償版のOracleXEがローカルPCにインストールされているものとします。
- サービスとしてはデフォルトとして作成されるプラガブルデータベースのサービス「XEPDB1」を使用するものとします。
- ユーザーとして「TEST」が作成されておりパスワードは「PAssw0rd」とします。
- TESTユーザーは、データベースへのアクセス、テーブル作成、データ登録、検索といった権限が付与されているものとします。
- dbconfig.ini(詳細は後述)は、Pythonプログラムと同フォルダに配置するものとします。
今回は無償版のOracleXEを使用しますが、製品版のOracleでも同様に使用できます。以降で設定ファイルの記載も含めて紹介していますので、各環境にあわせて設定ファイルの接続先やサービス名、または一部プログラムを変更してもらえば異なる環境で動かすことが可能です。上記前提は、コードをそのまま実行する場合の前提条件ということでご理解ください。
Oracleは無償版のOracle Database Express Edition (XE)が提供されています。個人で勉強する場合や小規模なアプリケーション開発の際には使用することができます。機能としてはフル機能使用できますが、リソースに対して以下のような制限があるため注意が必要です。
- ユーザーデータとして使えるのは最大12GB
- データベースのメモリとして使えるのは最大2GB
- CPUスレッドは最大2つ
※上記制限は、記事公開時点のOracle Database 21c Express Editionの場合であり、バージョンなどにより変更されている可能性があります。
cx_Oracleを用いたOracleデータベースアクセスのためのサンプルプログラム
cx_Oracleを用いてOracleデータベースへアクセスするための設定ファイルおよびサンプルプログラム、実行結果は以下のようになります。
前述した前提に従った環境であれば、このままコピーしていただくだけでも使用することができるかと思います。それぞれの具体的な詳細については、次節で説明します。
【dbconfig.ini】DBアクセス設定ファイル
[ORACLE_DB_SERVER] host = localhost port = 1521 service = XEPDB1 user = TEST password = PAssw0rd
【db_connect_oracle.py】DBアクセス用クラスと使い方例を含むプログラム
import cx_Oracle import configparser class DbConnectOracle: def __init__(self) -> None: """コンストラクタ :return: None """ # コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read("dbconfig.ini") # 接続情報の取得 host = config_db["ORACLE_DB_SERVER"]["host"] port = config_db["ORACLE_DB_SERVER"]["port"] service_name = config_db["ORACLE_DB_SERVER"]["service"] user = config_db["ORACLE_DB_SERVER"]["user"] password = config_db["ORACLE_DB_SERVER"]["password"] # DSN(データソース名)を作成する self.dsn = cx_Oracle.makedsn(host, port, service_name=service_name) # コネクションを確立する self.con = cx_Oracle.connect( user=user, password=password, dsn=self.dsn, encoding="UTF-8" ) # カーソルを作成する self.cursor = self.con.cursor() def execute_non_query(self, sql: str, bind_var: dict = None) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :return: None """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) def execute_query(self, sql: str, bind_var: dict = None, count: int = 0) -> list: """SELECTのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :param count: データ取得件数 :return: 結果リスト """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) result = [] if count == 0: columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) rows = self.cursor.fetchall() for row in rows: result.append(row) else: # 件数指定がある場合はその件数分を取得する columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) rows = self.cursor.fetchmany(count) for row in rows: result.append(row) return result def commit(self) -> None: """コミット :return: None """ self.con.commit() def rollback(self) -> None: """ロールバック :return: None """ self.con.rollback() def __del__(self) -> None: """デストラクタ :return: None """ self.cursor.close() self.con.close() def main(): db_oracle = DbConnectOracle() # ===== テーブルを作成する create_sql = ( "CREATE TABLE SAMPLE_TABLE" "(" "ID NUMBER GENERATED ALWAYS AS IDENTITY," "STR1 VARCHAR2(50), " "VALUE1 NUMBER, " "LAST_UPDATE_DATETIME TIMESTAMP, " "PRIMARY KEY (ID)" ")" ) db_oracle.execute_non_query(create_sql) # コミット db_oracle.commit() # ===== データをINSERTする insert_sql = ( "INSERT INTO SAMPLE_TABLE " "(STR1, VALUE1, LAST_UPDATE_DATETIME) " "VALUES(:str1, :val1, CURRENT_TIMESTAMP)" ) for i in range(5): input_str = f"test_str{i}" input_val = 10 * i bind = {"str1": input_str, "val1": input_val} db_oracle.execute_non_query(insert_sql, bind) # コミット db_oracle.commit() # ===== データを検索する (全て検索) print("===== 全件検索") select_sql = "SELECT * FROM SAMPLE_TABLE" select_result = db_oracle.execute_query(select_sql) print(select_result) # ===== 件数を指定して検索する (以下の例は3件) print("===== 取得する件数の指定") select_sql = "SELECT * FROM SAMPLE_TABLE" select_result = db_oracle.execute_query(select_sql, count=3) print(select_result) # ===== 条件を指定して検索する print("===== 条件を指定して実行する") select_sql = "SELECT * FROM SAMPLE_TABLE WHERE VALUE1 > :val1" print("条件1") select_result = db_oracle.execute_query(select_sql, {"val1": 10}) print(select_result) print("条件2") select_result = db_oracle.execute_query(select_sql, {"val1": 30}) print(select_result) if __name__ == "__main__": main()
【実行結果】 ===== 全件検索 [{'ID': 1, 'STR1': 'test_str0', 'VALUE1': 0, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 2, 'STR1': 'test_str1', 'VALUE1': 10, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 3, 'STR1': 'test_str2', 'VALUE1': 20, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 4, 'STR1': 'test_str3', 'VALUE1': 30, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 5, 'STR1': 'test_str4', 'VALUE1': 40, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}] ===== 取得する件数の指定 [{'ID': 1, 'STR1': 'test_str0', 'VALUE1': 0, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 2, 'STR1': 'test_str1', 'VALUE1': 10, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 3, 'STR1': 'test_str2', 'VALUE1': 20, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}] ===== 条件を指定して実行する 条件1 [{'ID': 3, 'STR1': 'test_str2', 'VALUE1': 20, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 4, 'STR1': 'test_str3', 'VALUE1': 30, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}, {'ID': 5, 'STR1': 'test_str4', 'VALUE1': 40, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}] 条件2 [{'ID': 5, 'STR1': 'test_str4', 'VALUE1': 40, 'LAST_UPDATE_DATETIME': datetime.datetime(2022, 4, 15, 6, 15, 26, 318000)}]
細かな例外処理はしていませんのでご注意ください。プログラムを参考にして使用いただいても構いませんが、ご自身の責任にて使用をお願いします。
サンプルコードの詳細説明
DB接続設定ファイル dbconfig.ini
まずは、DB接続設定ファイルをdbconfig.iniとして用意します。ファイル名は呼び出し時の指定と揃えていただければ、任意につけていただいて構いません。以降で説明するDBアクセスクラスのプログラム側ではconfigparserを用いて、この設定ファイルの内容を取得して使用します。
【dbconfig.ini】DBアクセス設定ファイル(再掲)
[ORACLE_DB_SERVER] host = localhost port = 1521 service = XEPDB1 user = TEST password = PAssw0rd
具体的には、host(ホスト)、port(ポート)、service(サービス名)、user(ユーザー名)、password(パスワード)を記載しています。
hostについては、具体的なサーバーのホスト名やIPアドレスでも構いません。今回はローカルPCのDBを使うためlocalhostとしています。portについては、Oracleでデフォルトで使用されるポートです。service、user、passwordは環境にあわせて指定します。
設定ファイルにせずに上記情報をプログラムに直接記載しても問題ないのですが、メンテナンスしづらいので設定ファイルとして切り出しています。また、パスワードをそのまま平文で記載しておくのはよろしくありませんが、今回は簡単にするためそのままとしています。
DBアクセス用クラス DBConnectOracle
DBアクセス用のクラスとしてDbConnectOracleというクラスを定義しています。
メソッドとして定義しているものは以下になります。
メソッド名 | 概要 |
---|---|
__init__ | コンストラクタ。処理内で設定ファイルから接続情報を読み出し、DBへのコネクション確立とカーソル作成を行います。 |
execute_non_query | CREATE/INSERT/UPDATE/DELETEの処理用のSQL実行メソッド。SQL文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query | SELECTの処理用のSQL実行メソッド。返却値は辞書のリストとなる。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。 |
commit | コミットして変更内容を確定します。 |
rollback | ロールバックして変更内容を破棄します。 |
__del__ | デストラクタ。使われなくなったタイミングで、データベースのカーソルとコネクションをクローズします。 |
それぞれのメソッドのポイントとなる内容について以降で解説していきます。
コンストラクタ:__init__
コンストラクタでは、DBへの接続を確立します。まずは、以下の部分でコンフィグファイルから設定を取得してきます。
def __init__(self) -> None: """コンストラクタ :return: None """ # コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read('dbconfig.ini') # 接続情報の取得 host = config_db['ORACLE_DB_SERVER']['host'] port = config_db['ORACLE_DB_SERVER']['port'] service_name = config_db['ORACLE_DB_SERVER']['service'] user = config_db['ORACLE_DB_SERVER']['user'] password = config_db['ORACLE_DB_SERVER']['password']
上記部分により、hostなどの情報を設定ファイルから取得できます。その後、以下の部分で接続のためのコネクション確立とDB操作のためのカーソルを用意しています。
# DSN(データソース名)を作成する self.dsn = cx_Oracle.makedsn(host, port, service_name=service_name) # コネクションを確立する self.con = cx_Oracle.connect( user=user, password=password, dsn=self.dsn, encoding='UTF-8' ) # カーソルを作成する self.cursor = self.con.cursor()
DSN(データソース名)は、makedsnメソッドで組み立てることができます。self.dsnをprintしてみてもらえれば分かりますが、以下のような接続用の文字列を組みたてることができます。
(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=XEPDB1)))
connectメソッドに、user、password、dsn、encodingなどを指定することでコネクションが確立できます。また、カーソルはコネクションのcursorメソッドで作成できます。
SQL実行:execute_non_query
SQL実行用のメソッドとしてexecute_non_queryを用意しています。後述するexecute_queryとの違いは返却値がないことで、CREATE/INSERT/UPDATE/DELETEなどのSQLを実行する用として作成しています。
def execute_non_query(self, sql: str, bind_var: dict = None) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :return: None """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var)
SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には、引数に辞書を設定して実行することができます。
SQL実行:execute_query
SQL実行用のメソッドとしてもう一つexecute_queryを用意しています。上記のexecute_non_queryとの違いは返却値があることでSELECTしてデータを取得する用として作成しています。
def execute_query(self, sql: str, bind_var: dict = None, count: int = 0) -> list: """SELECTのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :param count: データ取得件数 :return: 結果リスト """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) result = [] if count == 0: columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) rows = self.cursor.fetchall() for row in rows: result.append(row) else: # 件数指定がある場合はその件数分を取得する columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args)) rows = self.cursor.fetchmany(count) for row in rows: result.append(row) return result
SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には、引数に辞書を設定して実行することができます。
SELECT結果の全てを取得する場合は、fetchallメソッドで取得することができます。また、件数を指定する場合には、fetchmanyメソッドで取得件数を指定すると指定件数分のみ取得できます。また、今回は使用していませんが1件取得するfetchoneメソッドもあります。
cx_Oracleは返却値は、結果セットをタプルのリストで返します。しかし、辞書の方が列名が分かり扱いやすい場合があるので、以下の部分で辞書で返却するような設定にしています。
columns = [col[0] for col in self.cursor.description] self.cursor.rowfactory = lambda *args: dict(zip(columns, args))
columnsの部分ではカーソルのdescriptionメソッドで列名を取得しています。その後、カーソルのrowfactoryにlamda式として列名と値の辞書に変換する無名関数を設定しています。これによりSELECTの結果を辞書に変換してくれます。なお、上記の部分をコメントアウトすれば、結果はタプルのリストになります。
コミット:commit
データベースへの変更を確定させるためにはコミットをする必要があります。そのためのメソッドとしてcommitメソッドを用意しています。
def commit(self) -> None: """コミット :return: None """ self.con.commit()
コネクションにはcommitメソッドが用意されているため、実行することでコミットすることが可能です。
ロールバック:rollback
コミット前のデータベースへの変更を取り消したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。そのためのメソッドとしてrollbackメソッドを用意しています。
def rollback(self) -> None: """ロールバック :return: None """ self.con.rollback()
コネクションにはrollbackメソッドが用意されているため、実行することでロールバックすることが可能です。
デストラクタ:__del__
クラスのインスタンスが使用されなくなったら、DBアクセスのために用意していたカーソルとコネクションを閉じる必要があります。そのため、デストラクタである__del__内でクローズ処理を実施しています。
def __del__(self) -> None: """デストラクタ :return: None """ self.cursor.close() self.con.close()
カーソル及びコネクションにはcloseメソッドが用意されているため、実行することでクローズ処理をすることができます。
DBアクセス用クラス DbConnectOracleの使い方
~テーブル作成、データ追加、データ検索例~
main関数では、上記で定義したDbConnectOracleクラスを使用して、テーブルの作成(CREATE TABLE)、データの追加(INSERT)、データ検索(SELECT)の使い方の例を記載しています。それぞれのパートの内容を順に説明します。
インスタンスの生成
まずは、以下のように作成したDBアクセスクラスをインスタンス化します。
db_oracle = DbConnectOracle()
テーブルの作成(CREATE TABLE)
CREATE TABLEでテーブルを作成しているのが以下の部分です。
# ===== テーブルを作成する create_sql = "CREATE TABLE SAMPLE_TABLE" \ "(" \ "ID NUMBER GENERATED ALWAYS AS IDENTITY," \ "STR1 VARCHAR2(50), " \ "VALUE1 NUMBER, " \ "LAST_UPDATE_DATETIME TIMESTAMP, " \ "PRIMARY KEY (ID)" \ ")" db_oracle.execute_non_query(create_sql) # コミット db_oracle.commit()
上記のようにCREATE用のSQLを用意して、execute_non_queryメソッドに渡します。処理を確定する際にはcommitメソッドを呼び出します。
テーブルの存在有無確認などは含めていないSQLなので、複数回実行すると既にテーブルがあるためエラーとなります。その場合は、「テーブルをDROPする」又は「CREATE TABLE部分のプログラムをコメントアウトする」などしてみてください。
データの登録(INSERT)
INSERTでデータを登録しているのが以下の部分です。
# ===== データをINSERTする insert_sql = "INSERT INTO SAMPLE_TABLE " \ "(STR1, VALUE1, LAST_UPDATE_DATETIME) " \ "VALUES(:str1, :val1, CURRENT_TIMESTAMP)" for i in range(5): input_str = f'test_str{i}' input_val = 10 * i bind = {'str1': input_str, 'val1': input_val} db_oracle.execute_non_query(insert_sql, bind) # コミット db_oracle.commit()
今回の例では、5件のデータを追加しています。SQLには値を埋め込む部分に「:str1」、 「:val1」といったバインド変数が用意してあり、execute_non_queryメソッドにて辞書形式でバインド変数と値を指定することでINSERTを実行しています。処理を確定する際には、commitメソッドを呼び出します。
なお、今回IDは「GENERATED ALWAYS AS IDENTITY」という指定をしているので、自動で採番がされます。
データの検索(SELECT)
SELECTで登録したデータを検索しているのが以下の部分です。今回は3つほど例を用意しています。
# ===== データを検索する (全て検索) print('===== 全件検索') select_sql = "SELECT * FROM SAMPLE_TABLE" select_result = db_oracle.execute_query(select_sql) print(select_result) # ===== 件数を指定して検索する (以下の例は3件) print('===== 取得する件数の指定') select_sql = "SELECT * FROM SAMPLE_TABLE" select_result = db_oracle.execute_query(select_sql, count=3) print(select_result) # ===== 条件を指定して検索する print('===== 条件を指定して実行する') select_sql = "SELECT * FROM SAMPLE_TABLE WHERE VALUE1 > :val1" print('条件1') select_result = db_oracle.execute_query(select_sql, {'val1': 10}) print(select_result) print('条件2') select_result = db_oracle.execute_query(select_sql, {'val1': 30}) print(select_result)
全件検索する場合には「SELECT *」で検索すれば情報を取得できます。また、count引数に数字を指定すれば全件のうち指定した件数のみ取得できます。
条件を指定する場合には、WHERE句のあるSQLをそのまま渡してもいいですが、上記の例では条件の値を「:val1」としてバインド変数で値を変えながら実行しています。これにより条件に一致するデータのみ検索出来ていることが分かるかと思います。
以上が、今回作成してみたDBアクセスクラスの簡単な使い方です。cx_Oracleを用いたOracleデータベースへのアクセス方法として参考にしてみていただければと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。