PySpark

【PySpark】Delta Lakeの使い方の基本

【PySpark】Delta Lakeの使い方の基本

PySparkにおけるDelta Lakeの使い方の基本について解説します。

レイクハウスとDelta Lake

レイクハウス(Lakehouse)は、データレイクとデータウェアハウスの機能を組み合わせた新しいデータ管理アーキテクチャとして近年注目されている概念です。レイクハウスを実現するための技術の一つとしてDelta Lakeというものがあります。

本記事では、PySparkでのDelta Lakeの使い方の基本について紹介します。

レイクハウスの概念については本記事では説明しません。レイクハウスについては「データ管理技術の違いを理解する」でまとめています。データベース、データウェアハウス、データレイク、データマートといった考え方も含めてまとめていますので興味があれば参考にしてください。

PySparkでのDelta Lakeの使い方の前に、まずはDelta Lakeの概要を紹介します。Delta Lakeについて理解している方は読み飛ばしてください。

Delta Lake

Delta Lakeは、Databricks社によって開発されたもので、データレイク上でのACID特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーとして作られました。基盤技術としてはApache Spark上で動作し、Parquet形式でデータを格納します。Delta Lakeは、Databricks社が開発していますがオープンソースプロジェクトであるため、Databricks以外の環境でも使用できます。

データレイクは、多様なデータ形式を扱えますが管理が難しくなります。例えば、ファイルを同時にアクセスするとファイルを破壊する可能性があるなど、トランザクション処理には弱いという特徴があります。Delta Lakeはデータレイク上で、データ整合性と信頼性を向上させるために多くの機能を提供してくれます。

Delta Lakeの主な機能は以下のようなものがあります。

主な機能概要
ACID特性のサポートトランザクションで重要な、ACID特性(A(Atomicity):原子性、C(Consistency):一貫性、I(Isolation):独立性、D(Durability)永続性)をサポートします。
タイムトラベル
(バージョン管理)
過去の特定の時点のデータにアクセスするなど、データの変更履歴を追跡し、必要に応じて以前の状態に復元することができます。
スキーマ進化のサポートデータのスキーマは時間とともに変化する可能性があります。代表的なものが、列の追加等です。
Delta Lakeではスキーマの進化をサポートしており、新しい列追加や変更を容易に行えます。
メタデータの管理大規模データセットにおける効率的なメタデータ管理を提供します。データの管理がスムーズにできます。
パフォーマンスの最適化データの圧縮、パーティショニング、インデックス作成など、データアクセスのパフォーマンスを向上させます。

Delta Lakeは、Databricksで使用されています。また、2023年11月にMicrosoftから一般公開(GA)が始まったMicrosoft Fabricというデータ分析ソリューションでも規定ストレージ形式となっています。近年のデータ活用において理解しておくべき技術の一つです。

PySparkを用いたDelta Lakeの使い方の基本

以降では、PySparkを用いたDelta Lakeの使い方の基本を例を使って紹介します。

環境準備

本記事の前提環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用します。この環境は、Delta Lakeに関するモジュール(delta-spark)がインストールされていないため、個別にインストールが必要です。

実行時のインストール

delta-sparkのインストールについて簡単な方法は、以下のように開発中のnotebookのセルでpip installを実行する方法です。

!pip install delta-spark

「!」は、notebook上でシステムコマンドを実行する際に使用するものです。上記によりpip installを実行してdelta-sparkのインストールが可能です。

この方法はお手軽ですが、もともとのDocker環境が変更されているわけではないので、Docker環境を再起動した場合には、delta-sparkは消えてしまいます。

delta-sparkを含むDocker環境を作成する

もう一つの方法は、Spark環境が準備されているjupyter/pyspark-notebook環境を使いつつ、delta-sparkをインストールしたDocker環境を作ってしまうことです。

まずは、テキストエディタで以下のように記載し、Dockerfileという名前で保存します。

FROM jupyter/pyspark-notebook

# Delta Lakeのインストール
RUN pip install delta-spark

上記のDockerfileが保存されているフォルダで、以下コマンドを実行することでdelta-sparkがインストールされている環境を作ることができます。

docker build -t jupyter-pyspark-delta .

上記コマンドは、現在のディレクトリ(.)にあるDockerfileを使用してDockerイメージをビルドし、イメージにjupyter-pyspark-deltaという名前を付けているということを意味します。

あとは、作成した環境を使って以下のように実行するとdelta-sparkを含む環境で実行することが可能です。

docker run --rm -it -p 8888:8888 -p 4040:4040 -v D:\docker\pyspark_notebook_1:/home/jovyan/work jupyter-pyspark-delta

上記でDelta Lakeを使えるSpark環境を用意できます。

以降で、Sparkセッションの作成から各種Delta Lakeの操作方法を紹介していきます。

Sparkセッションの作成

Sparkを実行するには、Sparkセッションの作成が必要です。Sparkセッションは以下のように生成できます。

from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

# SparkSessionの初期化
builder = (
    SparkSession.builder.appName("DeltaLakeExample")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
# Delta Lakeで動作するように設定
spark = configure_spark_with_delta_pip(builder).getOrCreate()

Delta Lakeを使用するには、SparkSession.builderの中でconfigを使って"spark.sql.extensions""spark.sql.catalog.spark_catalog"に関する設定が必要です。

configure_spark_with_delta_pip()関数は、SparkセッションでDelta Lakeが動作するように設定するためのユーティリティ関数です。この関数を使用すると、Delta Lakeをサポートするために必要な設定がSparkセッションに自動的に適用されます。

この処理の中で、Delta Lakeと連携するための必要な依存関係の設定やSpark設定を含めてセッションを初期化することができるため、Delta Lakeを使用する環境設定が簡素化され、手動で多くの設定を行う必要がなくなるため便利です。

データの作成

以降の説明で使用するデータを以下のように作成しておきます。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# ===== データの作成
# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# DataFrameの内容を表示
df.show()
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

DataFrameの作成方法は、本記事の本題ではないので詳細は割愛します。「DataFrameの作成方法とスキーマ」にデータ作成方法についてはまとめていますので必要に応じて参考にしてください。

データ入出力

Delta Lakeに対する入出力について説明します。後の説明を見ていただければわかりますが、基本的にはPySparkでのデータ入出力とほとんど同じです。PySparkでの入出力は「DataFrameの入出力 read, write」でまとめていますので興味があれば参考にしてください。

データの書き込み

データの書き込みは以下のように実行します。

# データの書き込み
(df.write
    .format("delta")
    .mode("overwrite")
    .save("./delta-table")
)

【出力結果】

DataFrameのwriteの使い方と基本は同じですが、formatメソッドに"delta"を指定します。出力されたフォルダでは、上記出力結果例のようにparquetファイルで出力がされます。

また、_delta_logというログフォルダが作成され、jsonファイルが出力されます。このフォルダには、データ履歴等の各種情報が記録されます。

Note

上記例の出力ファイルが複数になることに疑問に思われる方がいるかもしれません。これはDelta Lakeの基盤で動いているSparkが分散された実行ノードで実行され、それぞれのノードがファイルを出力しているためです。Sparkでは、ファイル統合は負荷が高い処理のため、それを避けてそれぞれのノードが出力をしています。Sparkアプリケーションの概念については「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください。

なお、ファイルを統合して出力したい場合は、coalescerepartitionといった方法でまとめることも可能です。「DataFrameの入出力 read, write」内にcoalescerepartitionについても説明していますので、あわせてご確認ください。ただし、ファイル統合は負荷が高くなる処理であるため実行タイミングは十分に注意して検討してください。

データの読み込み

データの読み込みは以下のように実行します。

# データの読み込み
df_read = (spark.read
                .format("delta")
                .load("./delta-table")
)
df_read.show()
+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Yuki| 20|
|  Taro| 25|
+------+---+

DataFrameのreadの使い方と基本は同じですが、formatメソッドに"delta"を指定し、読み込み対象パスをloadで指定します。

タイムトラベル(バージョン管理)

Delta Lakeの特徴として、タイムトラベルというバージョン管理機能があります。これにより、データの特定状態を確認したり、元に戻したりすることが可能になります。

タイムトラベルの確認のために、以下のようにデータフレームに行を追加して、変更をしておきます。

# データを更新する
new_row = spark.createDataFrame([("Jiro", 40)], schema)

# データフレームに追加
df = df.union(new_row)
df.show()

# データの更新
(df.write
    .format("delta")
    .mode("overwrite")
    .save("./delta-table")
)
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
|  Jiro| 40|
+------+---+

バージョン情報の確認方法

タイムトラベルを利用するためにバージョン情報を確認します。扱っているDelta Lakeのテーブル情報は、以下のようにDeltaTableを使うことで確認ができます。

# ===== タイムトラベルを使用する
from delta import DeltaTable

# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()

# すべての履歴を表示
history.show(truncate=False)
# バージョンのみ選択して表示
history.select("version").show(truncate=False)
# タイムスタンプのみ選択して表示
history.select("timestamp").show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|1      |2023-12-09 10:09:36.239|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |0          |Serializable  |false        |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0      |2023-12-09 10:09:30.767|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |false        |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+

+-------+
|version|
+-------+
|1      |
|0      |
+-------+

+-----------------------+
|timestamp              |
+-----------------------+
|2023-12-09 10:09:36.239|
|2023-12-09 10:09:30.767|
+-----------------------+

DeltaTable.forPathに、Sparkセッション情報とDelta Lakeの保存先パスを指定することで、テーブル情報が取得できます。取得したテーブル情報のhistoryを使用することでヒストリー情報を取得することができます。

ヒストリー情報にはversion情報や、timestamp情報が含まれています。上記例でいうと、最初に作成して保存したDataFrameがversion 0で、先ほど行を追加して更新したDataFrameがversion 1になります。

これらの情報を使うことで、以降で紹介するように特定バージョンにおけるデータにアクセスすることが可能です。

特定のバージョンにアクセスする

特定バージョンを指定して履歴情報にアクセスする場合には、以下のようにします。

# 特定のバージョンを使用してアクセスする
df_version0 = spark.read.format("delta").option("versionAsOf", 0).load("./delta-table")
df_version0.show()

df_version1 = spark.read.format("delta").option("versionAsOf", 1).load("./delta-table")
df_version1.show()
+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Yuki| 20|
|  Taro| 25|
+------+---+

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Jiro| 40|
|  Yuki| 20|
|  Taro| 25|
+------+---+

特定バージョンを指定して読み込みを行うには、readoption"versionAsOf"を使って「option("versionAsOf", 0)」のように指定します。ここで0がバージョン番号を指しています。

実際に取得した結果を見ると、version 0は作成時のデータで、version 1が先ほど更新したデータとなっているということが分かるかと思います。

特定のタイムスタンプでアクセスする

特定のタイムスタンプで指定して履歴情報にアクセスすることも可能です。タイムスタンプを使用する場合には、以下のようにします。

# タイムスタンプを使用してアクセスする
timestamp = "2023-12-09 10:09:30.767"

df_timestamp = spark.read.format("delta").option("timestampAsOf", timestamp).load("./delta-table")
df_timestamp.show()
+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Yuki| 20|
|  Taro| 25|
+------+---+

特定のタイムスタンプを指定して読み込みを行うには、readoption"timestampAsOf"を使って「option("timestampAsOf", timestamp)」のように指定します。上記例では、timestampにversion 0のタイムスタンプ時刻を指定しています。

特定のバージョンへのロールバック

特定のバージョンにロールバックしたい場合は以下のようにします。

# データを特定のバージョンにロールバックする
# バージョンを指定する
rollback_version = 0

# 該当データを読み込む
df = spark.read.format("delta").option("versionAsOf", rollback_version).load("./delta-table")

# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")

# Delta Lakeの履歴情報を取得する
delta_table = DeltaTable.forPath(spark, "./delta-table")
history = delta_table.history()
history.show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|2      |2023-12-09 10:09:51.892|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |1          |Serializable  |false        |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2084}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|1      |2023-12-09 10:09:36.239|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |0          |Serializable  |false        |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0      |2023-12-09 10:09:30.767|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |false        |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+

やっていることは上記で紹介した特定のバージョン情報でデータを取得して、テーブルを上書きしているだけです。新しくできたversion 2が、version 0の状態に戻したものになっています。

スキーマの進化

データのスキーマは時間とともに変化する可能性があります。Delta Lakeではスキーマを進化をサポートしており、新しい列追加や変更を容易に行えます。

デフォルトのスキーマ検証

Delta Lakeでは、スキーマの進化をサポートしますが、デフォルト設定では厳密なスキーマ検証が行われます。実際に列を追加して上書きしてみようとすると以下のようにエラーとなります。

from pyspark.sql.functions import lit

# Delta Lakeに列を追加する
df = df.withColumn("flg", lit(1))
df.show()

# テーブルを上書きする ←これはエラーになる
df.write.format("delta").mode("overwrite").save("./delta-table")
+------+---+---+
|  name|age|flg|
+------+---+---+
|Hanako| 30|  1|
|  Taro| 25|  1|
|  Yuki| 20|  1|
+------+---+---+

(...省略...)
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 96d58398-afa8-49ea-9c62-1813e9a595d3).
(...省略...)

上記例では、schema mismatchというようにエラーが表示されます。

Delta Lakeでスキーマの進化をサポートするには、明示的にスキーマ進化の設定を行う必要があり、大きく以下の2つの方法があります。

  1. 特定の書き込みにのみスキーマの進化を適用する
  2. セッションでスキーマの進化を有効化する

以降で、それぞれについてみていきましょう。

特定の書き込みにのみスキーマの進化を適用する

特定の書き込みにのみスキーマの進化を適用する場合には、以下のようにします。

# 特定の書き込みにのみスキーマの進化を適用したい場合
(df.write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .save("./delta-table")
)

スキーマの進化を適用するには、writeの際にoptionで「option("mergeSchema", "true")」といったように、"mergeSchema""true"に設定します。これにより、変更された列の情報などがスキーマに反映されます。

セッションでスキーマの進化を有効化

もう一つの方法として、以下のようにセッションに対してスキーマの進化を有効にする方法があります。

# スキーマの進化を有効化
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# Delta Lakeに列を追加する
df = df.withColumn("country", lit("jpn"))
df.show()

# テーブルを上書きする
df.write.format("delta").mode("overwrite").save("./delta-table")
+------+---+---+-------+
|  name|age|flg|country|
+------+---+---+-------+
|Hanako| 30|  1|    jpn|
|  Taro| 25|  1|    jpn|
|  Yuki| 20|  1|    jpn|
+------+---+---+-------+

セッションに対してスキーマの進化を有効にしたい場合は、上記のように「spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")」といった形で指定します。

これにより、セッション全体に対してスキーマの進化を有効化することができます。

スキーマの進化を有効にする場合は、予期せぬスキーマの変更が発生しないよう注意する必要があります。特に、大規模なデータセットや重要データを扱う場合には、スキーマの変更には慎重に対処することが推奨されます。

メタデータの管理

Delta Lakeでは、テーブルのメタデータの管理を容易にするメソッドなどが用意されています。上記で少し紹介した履歴情報もその一つです。以降で、主要なメタデータの確認方法について紹介します。

テーブル情報の取得

Delta Lakeのテーブルの情報は、以下のようにDeltaTableを使うことで確認ができます。

# テーブル情報の取得
delta_table = DeltaTable.forPath(spark, "./delta-table")

DeltaTable.forPathで、SparkセッションとDelta Lakeの保存先パスを指定することで情報を取得できます。

テーブル情報の表示

テーブル情報の表示をしたい場合は、以下のようにtoDFを使ってDataFrameを取得することができます。

# テーブル情報の表示
delta_table.toDF().show()
+------+---+---+-------+
|  name|age|flg|country|
+------+---+---+-------+
|Hanako| 30|  1|    jpn|
|  Yuki| 20|  1|    jpn|
|  Taro| 25|  1|    jpn|
+------+---+---+-------+

テーブルの履歴とバージョン

テーブルの履歴とバージョン情報は、以下のようにhistoryを使用して確認することができます。なお、truncate=Falseは、showの表示を省略しないようにするものです。

# テーブルの履歴とバージョン
delta_table.history().show(truncate=False)
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|5      |2023-12-09 10:10:00.117|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |4          |Serializable  |false        |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|4      |2023-12-09 10:09:58.606|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |3          |Serializable  |false        |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|3      |2023-12-09 10:09:54.709|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |2          |Serializable  |false        |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 3452}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|2      |2023-12-09 10:09:51.892|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |1          |Serializable  |false        |{numFiles -> 3, numOutputRows -> 3, numOutputBytes -> 2084}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|1      |2023-12-09 10:09:36.239|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |0          |Serializable  |false        |{numFiles -> 5, numOutputRows -> 4, numOutputBytes -> 3158}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
|0      |2023-12-09 10:09:30.767|NULL  |NULL    |WRITE    |{mode -> Overwrite, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |false        |{numFiles -> 4, numOutputRows -> 3, numOutputBytes -> 2468}|NULL        |Apache-Spark/3.5.0 Delta-Lake/3.0.0|
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+

テーブルの詳細情報

Delta Lakeのテーブルファイルの詳細情報を確認したい場合には、以下のようにSpark SQLを使用して情報を取得することが可能です。なお、truncate=Falseは、showの表示を省略しないようにするものです。

# テーブルのファイル情報
spark.sql("DESCRIBE DETAIL './delta-table'").show(truncate=False)
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|format|id                                  |name|description|location                                                                                                  |createdAt              |lastModified           |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|tableFeatures           |
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+
|delta |38964833-dc9b-49a6-bfe5-b0c9142989fa|NULL|NULL       |file:/home/jovyan/work/python-tech-sample-source/python-data-analysis/pyspark/delta-lake-basic/delta-table|2023-12-09 10:09:28.235|2023-12-09 10:10:00.117|[]              |3       |3452       |{}        |1               |2               |[appendOnly, invariants]|
+------+------------------------------------+----+-----------+----------------------------------------------------------------------------------------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+------------------------+

これにより、Delta Lakeのテーブル詳細情報を確認することができます。

まとめ

PySparkにおけるDelta Lakeの使い方の基本について解説しました。

Delta Lakeは、データレイク上でのACID特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーとして開発されています。具体的には、ACID特性のサポート、タイムトラベル(バージョン管理)、スキーマ進化のサポート、メタデータ管理、パフォーマンスの最適化などを多くの機能を提供してくれます。

本記事では、PySparkを使ってDelta Lakeを扱う基本的な使い方を紹介しました。Delta Lakeは、Databricksや最近公開されたMicrosoft Fabric等でも活用されている技術です。ぜひ、使い方を理解してもらえると良いかと思います。