Pythonでmysqlclientを用いてMySQLデータベースへアクセスする方法について解説します。
Contents
PythonでMySQLデータベースへアクセスする
PythonでMySQLのデータベースへアクセスする方法として、mysqlclientモジュールを使用する方法があります。
本記事では、mysqlclientを用いたMySQLデータベースへのアクセス用クラスを作って使ってみます。
mysqlclientの公式ドキュメントページはこちらを参考にしてください。
mysqlclientのインストール
mysqlclientを使用するには、以下のコマンドを用いてpipでインストールを行ってください。
pip install mysqlclient
pipインストールが完了すれば、mysqlclientを用いてMySQLデータベースへアクセスするための準備ができます。以降では、MySQLデータベースへアクセスするためのクラスを作って、MySQLデータベースへアクセスする方法について紹介していきます。
mysqlclientを用いたMySQLデータベースへのアクセス
以降では、mysqlclientを使用してMySQLデータベースへアクセスするためのプログラムを紹介していきます。
サンプルプログラムの前提条件
以降で紹介するサンプルプログラムを実行するにあたっての前提条件は以下とします。
- MySQLがローカルPCにインストールされているものとします。
- スキーマとして「work」が作成されているものとします。
- ユーザーとして「test」が作成されており、パスワードは「PAssw0rd」とします。
- testユーザは、workスキーマに対してテーブルの作成やデータの追加といった権限が設定されているものとします。
- dbconfig.ini(詳細は後述)は、Pythonプログラムと同フォルダに配置するものとします。
以降で設定ファイルの記載も含めて紹介しますので、各環境にあわせて設定ファイルやプログラムを少し変更してもらえば異なる環境で動かすことも可能です。コードをそのまま実行する場合の前提条件ということでご理解ください。
mysqlclientを用いたMySQLデータベースアクセスのためのサンプルプログラム
mysqlclientを用いてMySQLデータベースへアクセスするための設定ファイルおよびサンプルプログラム、実行結果は以下のようになります。
前述した前提に従った環境であれば、このままコピーしていただくだけでも使用することができるかと思います。それぞれの具体的な詳細については次節で説明します。
【dbconfig.ini】DBアクセス設定ファイル
[MYSQL_DB_SERVER] host = localhost port = 3306 dbname = work user = test password = PAssw0rd
【db_connect_mysql.py】DBアクセス用クラスと使い方例を含むプログラム
"""DBアクセス (MySQL) """ import MySQLdb import configparser class DbConnectMySQL: def __init__(self) -> None: """コンストラクタ :return: """ # コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read('dbconfig.ini') # 接続情報の取得 host = config_db['MYSQL_DB_SERVER']['host'] port = int(config_db['MYSQL_DB_SERVER']['port']) dbname = config_db['MYSQL_DB_SERVER']['dbname'] user = config_db['MYSQL_DB_SERVER']['user'] password = config_db['MYSQL_DB_SERVER']['password'] # コネクションを確立する self.con = MySQLdb.connect( host=host, port=port, user=user, passwd=password, db=dbname ) # カーソルを作成する self.cursor = self.con.cursor( MySQLdb.cursors.DictCursor ) def execute_non_query(self, sql: str, bind_var: tuple = None) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :return: None """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) def execute_query(self, sql: str, bind_var: tuple = None, count: int = 0) -> list: """SELECTのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :param count: データ取得件数 :return: 結果リスト """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) result = [] if count == 0: rows = self.cursor.fetchall() for row in rows: result.append(row) else: # 件数指定がある場合はその件数分を取得する rows = self.cursor.fetchmany(count) for row in rows: result.append(row) return result def commit(self) -> None: """コミット :return: None """ self.con.commit() def rollback(self) -> None: """ロールバック :return: None """ self.con.rollback() def __del__(self) -> None: """デストラクタ :return: """ self.cursor.close() self.con.close() def main(): db_mysql = DbConnectMySQL() # ===== テーブルを作成する create_sql = "CREATE TABLE IF NOT EXISTS sample_table" \ "(" \ "id INT NOT NULL AUTO_INCREMENT, " \ "str1 VARCHAR(50), " \ "value1 INT, " \ "last_update_datetime TIMESTAMP, " \ "PRIMARY KEY (id)" \ ")" db_mysql.execute_non_query(create_sql) # コミット db_mysql.commit() # ===== データをINSERTする insert_sql = "INSERT INTO sample_table " \ "(str1, value1, last_update_datetime) " \ "VALUES(%s, %s, CURRENT_TIMESTAMP)" for i in range(5): input_str = f'test_str{i}' input_val = 10 * i bind = (input_str, input_val) db_mysql.execute_non_query(insert_sql, bind) # コミット db_mysql.commit() # ===== データを検索する (全て検索) print('===== 全件検索') select_sql = "SELECT * FROM sample_table" select_result = db_mysql.execute_query(select_sql) print(select_result) # ===== 件数を指定して検索する (以下の例は3件) print('===== 取得する件数の指定') select_sql = "SELECT * FROM sample_table" select_result = db_mysql.execute_query(select_sql, count=3) print(select_result) # ===== 条件を指定して検索する print('===== 条件を指定して実行する') select_sql = "SELECT * FROM sample_table WHERE value1 > %s" print('条件1') select_result = db_mysql.execute_query(select_sql, (10,)) print(select_result) print('条件2') select_result = db_mysql.execute_query(select_sql, (30,)) print(select_result) if __name__ == '__main__': main()
【実行結果】 ===== 全件検索 [{'id': 1, 'str1': 'test_str0', 'value1': 0, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 2, 'str1': 'test_str1', 'value1': 10, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 4, 'str1': 'test_str3', 'value1': 30, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}] ===== 取得する件数の指定 [{'id': 1, 'str1': 'test_str0', 'value1': 0, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 2, 'str1': 'test_str1', 'value1': 10, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}] ===== 条件を指定して実行する 条件1 [{'id': 3, 'str1': 'test_str2', 'value1': 20, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 4, 'str1': 'test_str3', 'value1': 30, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}, {'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}] 条件2 [{'id': 5, 'str1': 'test_str4', 'value1': 40, 'last_update_datetime': datetime.datetime(2022, 4, 16, 18, 48, 17)}]
細かな例外処理はしていませんのでご注意ください。プログラムを参考にして使用いただいても構いませんが、ご自身の責任にてお願いします。
サンプルコードの詳細説明
DB接続設定ファイル dbconfig.ini
まずは、DB接続設定ファイルをdbconfig.iniとして用意します。ファイル名は呼び出し時の指定と揃えていただければ、任意につけていただいて構いません。以降で説明するDBアクセスクラスのプログラム側ではconfigparserを用いて、この設定ファイルの内容を取得して使用します。
【dbconfig.ini】DBアクセス設定ファイル(再掲)
[MYSQL_DB_SERVER] host = localhost port = 3306 dbname = work user = test password = PAssw0rd
具体的には、host(ホスト)、port(ポート)、dbname(スキーマ名)、user(ユーザー名)、password(パスワード)を記載しています。
hostについては、具体的なサーバーのホスト名やIPアドレスでも構いません。今回はローカルPCのDBを使うためlocalhostとしています。portについては、MySQLでデフォルトで使用されるポートですが変更している場合は修正が必要です。dbname、user、passwordは環境にあわせて指定します。
設定ファイルにせずに上記情報をプログラムに直接記載しても問題ないのですが、メンテナンスしづらいので設定ファイルとして切り出しています。また、パスワードをそのまま平文で記載しておくのはよろしくありませんが、今回は簡単にするためにそのままとしています。
DBアクセス用クラス DbConnectMySQL
DBアクセス用のクラスとしてDbConnectMySQLというクラスを定義しています。
メソッドとして定義しているものは以下となります。
メソッド名 | 概要 |
---|---|
__init__ | コンストラクタ。処理内で設定ファイルから接続情報を読み出し、DBへのコネクション確立とカーソル作成を行います。 |
execute_non_query | CREATE/INSERT/UPDATE/DELETEの処理用のSQL実行メソッド。SQL文と必要に応じてバインド変数を渡して処理を実行します。 |
execute_query | SELECTの処理用のSQL実行メソッド。返却値は辞書のリストとなる。SQL文と必要に応じてバインド変数、データ取得件数を渡して処理を実行します。 |
commit | コミットして変更内容を確定します。 |
rollback | ロールバックして変更内容を破棄します。 |
__del__ | デストラクタ。使われなくなったタイミングで、データベースのカーソルとコネクションをクローズします。 |
それぞれのメソッドのポイントとなる内容について以降で解説していきます。
コンストラクタ:__init__
コンストラクタでは、DBへの接続を確立します。まずは、以下の部分でコンフィグファイルから設定を取得してきます。
# コンフィグファイルからデータを取得 config_db = configparser.ConfigParser() config_db.read('dbconfig.ini') # 接続情報の取得 host = config_db['MYSQL_DB_SERVER']['host'] port = int(config_db['MYSQL_DB_SERVER']['port']) dbname = config_db['MYSQL_DB_SERVER']['dbname'] user = config_db['MYSQL_DB_SERVER']['user'] password = config_db['MYSQL_DB_SERVER']['password']
上記部分により、hostなどの情報を設定ファイルから取得できます。なお、次のconnectメソッドに渡すportについては、intである必要があるためintに変換しています。
その後、以下の部分で接続のためのコネクション確立とDB操作のためのカーソルを用意しています。
# コネクションを確立する self.con = MySQLdb.connect( host=host, port=port, user=user, passwd=password, db=dbname ) # カーソルを作成する self.cursor = self.con.cursor( MySQLdb.cursors.DictCursor )
connectメソッドに設定ファイルから取得した各種情報を設定することで、コネクションが確立できます。また、カーソルはコネクションのcursorメソッドで作成できます。
カーソルは辞書型で扱えるようにMySQLdb.cursors.DictCursorを設定しています。
SQL実行:execute_non_query
SQL実行用のメソッドとしてexecute_non_queryを用意しています。後述するexecute_queryとの違いは返却値がないことで、CREATE/INSERT/UPDATE/DELETEなどのSQLを実行する用として作成しています。
def execute_non_query(self, sql: str, bind_var: tuple = None) -> None: """CREATE/INSERT/UPDATE/DELETEのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :return: None """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var)
SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。
SQL実行:execute_query
SQL実行用のメソッドとしてもう一つexecute_queryを用意しています。上記のexecute_non_queryとの違いは、返却値があることでSELECTしてデータを取得する用として作成しています。
def execute_query(self, sql: str, bind_var: tuple = None, count: int = 0) -> list: """SELECTのSQL実行メソッド :param sql: 実行SQL :param bind_var: バインド変数 :param count: データ取得件数 :return: 結果リスト """ # SQLの実行 if bind_var is None: self.cursor.execute(sql) else: # バインド変数がある場合は指定して実行 self.cursor.execute(sql, bind_var) result = [] if count == 0: rows = self.cursor.fetchall() for row in rows: result.append(row) else: # 件数指定がある場合はその件数分を取得する rows = self.cursor.fetchmany(count) for row in rows: result.append(row) return result
SQLを実行するためには、カーソルのexecuteメソッドに対象となるSQLを渡します。また、バインド変数がある場合には引数に指定して実行することができます。
SELECT結果を全て取得する場合はfetchallメソッドで取得することができます。また、件数を指定する場合には、fetchmanyメソッドで取得件数を指定すると指定件数分のみ取得できます。また、今回は使用していませんが1件取得するfetchoneメソッドもあります。
コミット:commit
データベースへの変更を確定させるためにはコミットをする必要があります。そのためのメソッドとしてcommitメソッドを用意しています。
def commit(self) -> None: """コミット :return: None """ self.con.commit()
コネクションにはcommitメソッドが用意されているため、実行することでコミットすることが可能です。
ロールバック:rollback
コミット前のデータベースへの変更を取り消したい場合は、ロールバックしてデータベースの状態を元に戻す必要があります。そのためのメソッドとしてrollbackメソッドを用意しています。
def rollback(self) -> None: """ロールバック :return: None """ self.con.rollback()
コネクションにはrollbackメソッドが用意されているため、実行することでロールバックすることが可能です。
デストラクタ:__del__
クラスのインスタンスが使用されなくなったら、DBアクセスのために用意していたカーソルとコネクションを閉じる必要があります。そのため、デストラクタである__del__内でクローズ処理を実施しています。
def __del__(self) -> None: """デストラクタ :return: None """ self.cursor.close() self.con.close()
カーソル及びコネクションにはcloseメソッドが用意されているため、実行することでクローズ処理をすることができます。
DBアクセス用クラス DbConnectMySQLの使い方
~テーブル作成、データ追加、データ検索例~
main関数では、上記で定義したDbConnectMySQLクラスを使用して、テーブルの作成(CREATE TABLE)、データの追加(INSERT)、データ検索(SELECT)の使い方の例を記載しています。それぞれのパートの内容を順に説明します。
インスタンスの生成
まずは、以下のように作成したDBアクセスクラスをインスタンス化します。
db_mysql = DbConnectMySQL()
テーブルの作成(CREATE TABLE)
CREATE TABLEでテーブル作成しているのが以下の部分です。
# ===== テーブルを作成する create_sql = "CREATE TABLE IF NOT EXISTS sample_table" \ "(" \ "id INT NOT NULL AUTO_INCREMENT, " \ "str1 VARCHAR(50), " \ "value1 INT, " \ "last_update_datetime TIMESTAMP, " \ "PRIMARY KEY (id)" \ ")" db_mysql.execute_non_query(create_sql) # コミット db_mysql.commit()
上記のようにCREATE用のSQL文を用意して、execute_non_queryメソッドに渡します。処理を確定する際には、commitメソッドを呼び出します。
データの登録(INSERT)
INSERTでデータを登録しているのが以下の部分です。
# ===== データをINSERTする insert_sql = "INSERT INTO sample_table " \ "(str1, value1, last_update_datetime) " \ "VALUES(%s, %s, CURRENT_TIMESTAMP)" for i in range(5): input_str = f'test_str{i}' input_val = 10 * i bind = (input_str, input_val) db_mysql.execute_non_query(insert_sql, bind) # コミット db_mysql.commit()
今回の例では、5件データを追加しています。SQLでは、値を埋め込む部分を%sで記載し、バインド変数を用意して渡すことで入力値を変えつつINSERTしています。処理を確定する際には、commitメソッドを呼び出します。
なお、今回idはAUTO_INCREMENTが設定されているため自動で採番がされます。そのため、このプログラムは実行するたびに5件ずつデータが追加されていきます。
データの検索(SELECT)
SELECTで登録したデータを検索しているのが以下の部分です。今回は3つほど例を用意しています。
# ===== データを検索する (全て検索) print('===== 全件検索') select_sql = "SELECT * FROM sample_table" select_result = db_mysql.execute_query(select_sql) print(select_result) # ===== 件数を指定して検索する (以下の例は3件) print('===== 取得する件数の指定') select_sql = "SELECT * FROM sample_table" select_result = db_mysql.execute_query(select_sql, count=3) print(select_result) # ===== 条件を指定して検索する print('===== 条件を指定して実行する') select_sql = "SELECT * FROM sample_table WHERE value1 > %s" print('条件1') select_result = db_mysql.execute_query(select_sql, (10,)) print(select_result) print('条件2') select_result = db_mysql.execute_query(select_sql, (30,)) print(select_result)
全件検索する場合には「SELECT *」で検索すれば情報を取得できます。また、count引数に数字を指定すれば全件のうち指定した件数のみ取得できます。
条件を指定する場合には、WHERE句のあるSQLをそのまま渡してもいいですが、上記の例では条件の値を%sとして、バインド変数で値を変えながら実行しています。これにより条件に一致するデータのみ検索出来ていることが分かるかと思います。
以上が、今回作成してみたDBアクセスクラスの簡単な使い方です。mysqlclientを用いたMySQLデータベースへのアクセス方法として参考にしてみていただければと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。