【PySpark】Delta Lakeの使い方の基本

PySpark における Delta Lakeの使い方の基本 について解説します。
目次
レイクハウスと Delta Lake
レイクハウス (Lakehouse) は、データレイクとデータウェアハウスの機能を組み合わせた新しいデータ管理アーキテクチャとして近年注目されている概念です。レイクハウスを実現するための技術の一つとして Delta Lake というものがあります。
この記事では、PySpark での Delta Lake の使い方の基本について紹介します。
レイクハウスの概念については、この記事では説明しません。レイクハウスについては「データ管理技術の違いを理解する」を参考にしてください。
PySpark での Delta Lake の使い方の前に、まずは Delta Lake の概要を紹介します。Delta Lake について理解している方は読み飛ばしてください。
Delta Lake
Delta Lake は、Databricks 社によって開発されたもので、データレイク上でのACID 特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーとして作られました。基盤技術としては Apache Spark 上で動作し、Parquet 形式でデータを格納します。Delta Lake は、Databricks 社が開発していますがオープンソースプロジェクトであるため、Databricks 以外の環境でも使用できます。
データレイクは、多様なデータ形式を扱えますが管理が難しくなります。例えば、ファイルを同時にアクセスするとファイルを破壊する可能性があるなど、トランザクション処理には弱いという特徴があります。Delta Lake はデータレイク上で、データ整合性と信頼性を向上させるために多くの機能を提供してくれます。
Delta Lake の主な機能は以下のようなものがあります。
| 主な機能 | 概要 |
|---|---|
| ACID 特性のサポート | トランザクションで重要な、ACID 特性(A (Atomicity) : 原子性、C (Consistency) : 一貫性、I (Isolation) : 独立性、D (Durability) : 永続性)をサポートします。 |
| タイムトラベル (バージョン管理) | 過去の特定の時点のデータにアクセスするなど、データの変更履歴を追跡し、必要に応じて以前の状態に復元することができます。 |
| スキーマ進化のサポート | データのスキーマは時間とともに変化する可能性があります。代表的なものが、列の追加等です。 Delta Lake ではスキーマの進化をサポートしており、新しい列追加や変更を容易に行えます。 |
| メタデータの管理 | 大規模データセットにおける効率的なメタデータ管理を提供します。データの管理がスムーズにできます。 |
| パフォーマンスの最適化 | データの圧縮、パーティショニング、インデックス作成など、データアクセスのパフォーマンスを向上させます。 |
Delta Lake は、Databricks で使用されています。また、2023年11月に Microsoft から一般公開 (GA) が始まった Microsoft Fabric というデータ分析ソリューションでも規定ストレージ形式となっています。近年のデータ活用において理解しておくべき技術の 1 つです。
PySpark を用いた Delta Lake の使い方の基本
以降では、PySpark を用いた Delta Lake の使い方の基本を例を使って紹介します。
環境準備
本記事の前提環境としては「PySparkの実行環境をDockerで用意する方法」で説明している Docker での Jupyter Notebook 実行環境を使用します。この環境は、Delta Lake に関するモジュール (delta-spark) がインストールされていないため、個別にインストールが必要です。
実行時のインストール
delta-spark のインストールについて簡単な方法は、以下のように開発中の notebook のセルで pip install を実行する方法です。
!pip install delta-spark
「!」は、notebook 上でシステムコマンドを実行する際に使用するものです。上記により pip install を実行して delta-spark のインストールが可能です。
この方法はお手軽ですが、もともとの Docker 環境が変更されているわけではないので、Docker 環境を再起動した場合には delta-spark は消えてしまいます。
delta-spark を含む Docker 環境を作成する
もう 1 つの方法は、Spark 環境が準備されている jupyter/pyspark-notebook 環境を使いつつ delta-sparkをインストールした Docker 環境を作ってしまうことです。
テキストエディタで以下のように記載し、Dockerfile という名前で保存します。
FROM jupyter/pyspark-notebook # Delta Lakeのインストール RUN pip install delta-spark
上記の Dockerfile が保存されているフォルダで、以下コマンドを実行することで delta-spark がインストールされている環境を作ることができます。
docker build -t jupyter-pyspark-delta .
上記コマンドは、現在のディレクトリ(.)にある Dockerfile を使用して Docker イメージをビルドし、イメージに jupyter-pyspark-delta という名前を付けているということを意味します。
あとは、作成した環境を使って以下のように実行すると delta-spark を含む環境で実行することが可能です。
docker run --rm -it -p 8888:8888 -p 4040:4040 -v D:\docker\pyspark_notebook_1:/home/jovyan/work jupyter-pyspark-delta
上記で Delta Lake を使える Spark 環境を用意できます。
以降で、Spark セッションの作成から各種 Delta Lake の操作方法を紹介します。
Spark セッションの作成
Spark を実行するには、Spark セッションの作成が必要です。Spark セッションは以下のように生成できます。
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
# SparkSessionの初期化
builder = (
SparkSession.builder.appName("DeltaLakeExample")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
# Delta Lakeで動作するように設定
spark = configure_spark_with_delta_pip(builder).getOrCreate()Delta Lake を使用するには SparkSession.builder の中で config を使って "spark.sql.extensions" や "spark.sql.catalog.spark_catalog" に関する設定が必要です。
configure_spark_with_delta_pip() 関数は、Spark セッションで Delta Lake が動作するように設定するためのユーティリティ関数です。この関数を使用すると、Delta Lake をサポートするために必要な設定が Spark セッションに自動的に適用されます。
この処理の中で、Delta Lake と連携するための必要な依存関係の設定や Spark 設定を含めてセッションを初期化することができるため、Delta Lake を使用する環境設定が簡素化され、手動で多くの設定を行う必要がなくなるため便利です。
データの作成
以降の説明で使用するデータを以下のように作成しておきます。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# ===== データの作成
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# DataFrameの内容を表示
df.show()+------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
DataFrame の作成方法は、この記事の本題ではないので詳細は割愛します。「DataFrameの作成方法とスキーマ」にデータ作成方法についてはまとめていますので必要に応じて参考にしてください。
データ入出力
Delta Lake に対する入出力について説明します。後の説明を見ていただければわかりますが、基本的には PySpark でのデータ入出力とほとんど同じです。PySpark での入出力は「DataFrameの入出力 read, write」を参考にしてください。
データの書き込み
データの書き込みは以下のように実行します。
# データの書き込み
(df.write
.format("delta")
.mode("overwrite")
.save("./delta-table")
)【出力結果】

DataFrame の write の使い方と基本は同じですが、format メソッドに "delta" を指定します。出力されたフォルダでは、上記出力結果例のように parquet ファイルで出力されます。
また、_delta_log というログフォルダが作成され、json ファイルが出力されます。このフォルダには、データ履歴等の各種情報が記録されます。
データの読み込み
データの読み込みは以下のように実行します。
# データの読み込み
df_read = (spark.read
.format("delta")
.load("./delta-table")
)
df_read.show()+------+---+ | name|age| +------+---+ |Hanako| 30| | Yuki| 20| | Taro| 25| +------+---+
DataFrame の read の使い方と基本は同じですが format メソッドに "delta" を指定し、読み込み対象パスを load で指定します。
タイムトラベル(バージョン管理)
Delta Lake の特徴として、タイムトラベルというバージョン管理機能があります。これにより、データの特定状態を確認したり、元に戻したりすることが可能になります。
タイムトラベルの確認のために、以下のようにデータフレームに行を追加して、変更をしておきます。
# データを更新する
new_row = spark.createDataFrame([("Jiro", 40)], schema)
# データフレームに追加
df = df.union(new_row)
df.show()
# データの更新
(df.write
.format("delta")
.mode("overwrite")
.save("./delta-table")
)+------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| | Jiro| 40| +------+---+
バージョン情報の確認方法
タイムトラベルを利用するためにバージョン情報を確認します。扱っている Delta Lake のテーブル情報は、以下のように DeltaTable を使うことで確認ができます。
# ===== タイムトラベルを使用する
from delta import DeltaTable
# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()
# すべての履歴を表示
history.show(truncate=False)
# バージョンのみ選択して表示
history.select("version").show(truncate=False)
# タイムスタンプのみ選択して表示
history.select("timestamp").show(truncate=False)+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1 |2023-12-09 10:09:36.239|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |0 |Serializable |false |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0 |2023-12-09 10:09:30.767|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
+-------+
|version|
+-------+
|1 |
|0 |
+-------+
+-----------------------+
|timestamp |
+-----------------------+
|2023-12-09 10:09:36.239|
|2023-12-09 10:09:30.767|
+-----------------------+DeltaTable.forPath に、Spark セッション情報と Delta Lake の保存先パスを指定することで、テーブル情報が取得できます。取得したテーブル情報の history を使用することでヒストリー情報を取得することができます。
ヒストリー情報には version 情報や、timestamp 情報が含まれています。上記例でいうと、最初に作成して保存した DataFrame が version 0 で、先ほど行を追加して更新した DataFrame が version 1 になります。
これらの情報を使うことで、以降で紹介するように特定バージョンにおけるデータにアクセスすることが可能です。
特定のバージョンにアクセスする
特定バージョンを指定して履歴情報にアクセスする場合には、以下のようにします。
# 特定のバージョンを使用してアクセスする
df_version0 = spark.read.format("delta").option("versionAsOf", 0).load("./delta-table")
df_version0.show()
df_version1 = spark.read.format("delta").option("versionAsOf", 1).load("./delta-table")
df_version1.show()+------+---+ | name|age| +------+---+ |Hanako| 30| | Yuki| 20| | Taro| 25| +------+---+ +------+---+ | name|age| +------+---+ |Hanako| 30| | Jiro| 40| | Yuki| 20| | Taro| 25| +------+---+
特定バージョンを指定して読み込みを行うには、read の option で "versionAsOf" を使って「option("versionAsOf", 0)」のように指定します。ここで 0 がバージョン番号を指しています。
実際に取得した結果を見ると、version 0 は作成時のデータで、version 1 が先ほど更新したデータとなっているということが分かるかと思います。
特定のタイムスタンプでアクセスする
特定のタイムスタンプで指定して履歴情報にアクセスすることも可能です。タイムスタンプを使用する場合には、以下のようにします。
# タイムスタンプを使用してアクセスする
timestamp = "2023-12-09 10:09:30.767"
df_timestamp = spark.read.format("delta").option("timestampAsOf", timestamp).load("./delta-table")
df_timestamp.show()+------+---+ | name|age| +------+---+ |Hanako| 30| | Yuki| 20| | Taro| 25| +------+---+
特定のタイムスタンプを指定して読み込むには、read の option で "timestampAsOf" を使って「option("timestampAsOf", timestamp)」のように指定します。上記例では timestamp に version 0 のタイムスタンプ時刻を指定しています。
特定のバージョンへのロールバック
特定のバージョンにロールバックしたい場合は以下のようにします。
# データを特定のバージョンにロールバックする
# バージョンを指定する
rollback_version = 0
# 該当データを読み込む
df = spark.read.format("delta").option("versionAsOf", rollback_version).load("./delta-table")
# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")
# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()
history.show(truncate=False)+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|2 |2023-12-09 10:09:51.892|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |1 |Serializable |false |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2084}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|1 |2023-12-09 10:09:36.239|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |0 |Serializable |false |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0 |2023-12-09 10:09:30.767|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+やっていることは上記で紹介した特定のバージョン情報でデータを取得して、テーブルを上書きしているだけです。新しくできた version 2 が、version 0 の状態に戻したものになっています。
スキーマの進化
データのスキーマは時間とともに変化する可能性があります。Delta Lake ではスキーマを進化をサポートしており、新しい列追加や変更を容易に行えます。
デフォルトのスキーマ検証
Delta Lake では、スキーマの進化をサポートしますが、デフォルト設定では厳密なスキーマ検証が行われます。実際に列を追加して上書きしてみようとすると以下のようにエラーとなります。
from pyspark.sql.functions import lit
# Delta Lakeに列を追加する
df = df.withColumn("flg", lit(1))
df.show()
# テーブルを上書きする ←これはエラーになる
df.write.format("delta").mode("overwrite").save("./delta-table")+------+---+---+ | name|age|flg| +------+---+---+ |Hanako| 30| 1| | Taro| 25| 1| | Yuki| 20| 1| +------+---+---+ (...省略...) AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 96d58398-afa8-49ea-9c62-1813e9a595d3). (...省略...)
上記例では schema mismatch というようにエラーが表示されます。
Delta Lake でスキーマの進化をサポートするには、明示的にスキーマ進化の設定を行う必要があり、大きく以下の 2 つの方法があります。
- 特定の書き込みにのみスキーマの進化を適用する
- セッションでスキーマの進化を有効化する
以降で、それぞれについてみていきましょう。
特定の書き込みにのみスキーマの進化を適用する
特定の書き込みにのみスキーマの進化を適用する場合には、以下のようにします。
# 特定の書き込みにのみスキーマの進化を適用したい場合
(df.write
.format("delta")
.option("mergeSchema", "true")
.mode("overwrite")
.save("./delta-table")
)スキーマの進化を適用するには write の際に option で「option("mergeSchema", "true")」のように "mergeSchema" を"true"に設定します。これにより、変更された列の情報などがスキーマに反映されます。
セッションでスキーマの進化を有効化
もう 1 つの方法として、以下のようにセッションに対してスキーマの進化を有効にする方法があります。
# スキーマの進化を有効化
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Delta Lakeに列を追加する
df = df.withColumn("country", lit("jpn"))
df.show()
# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")+------+---+---+-------+ | name|age|flg|country| +------+---+---+-------+ |Hanako| 30| 1| jpn| | Taro| 25| 1| jpn| | Yuki| 20| 1| jpn| +------+---+---+-------+
セッションに対してスキーマの進化を有効にしたい場合は、上記のように「spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")」といった形で指定します。これにより、セッション全体に対してスキーマの進化を有効化することができます。
メタデータの管理
Delta Lake では、テーブルのメタデータの管理を容易にするメソッドなどが用意されています。上記で少し紹介した履歴情報もその一つです。以降で、主要なメタデータの確認方法について紹介します。
テーブル情報の取得
Delta Lake のテーブル情報は、以下のように DeltaTable で確認できます。
# テーブル情報の取得 delta_table = DeltaTable.forPath(spark, "./delta-table")
DeltaTable.forPath で、Spark セッションと Delta Lake の保存先パスを指定することで情報を取得できます。
テーブル情報の表示
テーブル情報の表示をしたい場合は、以下のように toDF を使って DataFrameを 取得できます。
# テーブル情報の表示 delta_table.toDF().show()
+------+---+---+-------+ | name|age|flg|country| +------+---+---+-------+ |Hanako| 30| 1| jpn| | Yuki| 20| 1| jpn| | Taro| 25| 1| jpn| +------+---+---+-------+
テーブルの履歴とバージョン
テーブル履歴とバージョン情報は、history により確認ができます。なお、truncate=False は、show の表示を省略しないようにするものです。
# テーブルの履歴とバージョン delta_table.history().show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp |userId|userName|operation|operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|5 |2023-12-09 10:10:00.117|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |4 |Serializable |false |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|4 |2023-12-09 10:09:58.606|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |3 |Serializable |false |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|3 |2023-12-09 10:09:54.709|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |2 |Serializable |false |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|2 |2023-12-09 10:09:51.892|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |1 |Serializable |false |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2084}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|1 |2023-12-09 10:09:36.239|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |0 |Serializable |false |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0 |2023-12-09 10:09:30.767|NULL |NULL |WRITE |{mode -> Overwrite, partitionBy -> []}|NULL|NULL |NULL |NULL |Serializable |false |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+テーブルの詳細情報
Delta Lake のテーブルファイルの詳細情報を確認したい場合には、以下のようにSpark SQL を使用して情報を取得することが可能です。なお、truncate=False は、show の表示を省略しないようにするものです。
# テーブルのファイル情報
spark.sql("DESCRIBE DETAIL './delta-table'").show(truncate=False)+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|format|id |name|description|location |createdAt |lastModified |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures |
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|delta |38964833-dc9b-49a6-bfe5-b0c9142989fa|NULL|NULL |file:/home/jovyan/work/python-tech-sample-source/python-data-analysis/pyspark/delta-lake-basic/delta-table|2023-12-09 10:09:28.235|2023-12-09 10:10:00.117|[] |3 |3452 |{} |1 |2 |[appendOnly, invariants]|
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+これにより、Delta Lake のテーブル詳細情報を確認することができます。
まとめ
PySpark における Delta Lakeの使い方の基本 について解説しました。
Delta Lake は、データレイク上での ACID 特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーとして開発されています。具体的には、ACID 特性のサポート、タイムトラベル (バージョン管理)、スキーマ進化のサポート、メタデータ管理、パフォーマンスの最適化などを多くの機能を提供してくれます。
この記事では、PySpark を使って Delta Lake を扱う基本的な使い方を紹介しました。Delta Lake は、Databricks や Microsoft Fabric といったデータ基盤でも活用されています。ぜひ、使い方を理解してもらえたらと思います。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。







