【PySpark】日付フォルダで分割してデータを入出力する方法 partitionBy

PySpark でデータを保存する際に、日付フォルダで分割してデータを入出力する方法を解説します。
目次
データを日付フォルダに分割して保存
PySpark で DataFrame の入出力を行う際は write やread メソッドを使用します。PySpark での DataFrame の基本的な入出力方法については「DataFrameの入出力 read, write」を参考にしてください。
データレイク(例:Amazon S3、Azure ADLS等)において、特に大量のデータを管理している際、日付フォルダにデータを分割して保存することは、クエリのパフォーマンス向上などの観点で重要な方法です。
この記事では、PySpark において日付フォルダでデータを効率的に分割して入出力する方法を紹介します。
データのフォルダ分割と保存:メリットと注意点
データをフォルダ分割して保存して扱うメリットや注意するべき点を紹介します。考え方を十分理解されている方は読み飛ばしていただいて構いません。
メリット
データレイクでデータを扱う場合、効率的に管理できるようフォルダ構成をよく検討することが重要です。扱うデータの特徴により適切な構成は異なりますが、例えば、時系列データ保存では、日付ごとフォルダを分ける方法はベストプラクティスの 1 つです。
日付ごとフォルダを分けてデータ管理することは、以下のようなメリットがあります。
| 項目 | 内容 |
|---|---|
| クエリのパフォーマンス向上 | 日付ごとにデータを分けることで、特定の日付範囲のクエリの実行時間を短縮できます。 |
| データ整理/管理の簡素化 | 日付ごとにデータを分けることで、データの所在が明確になり、管理が容易になります。 |
| スケーラビリティの向上 | 大規模データセットの効率的な分散処理が可能になります。 |
| データ追加/更新の容易さ | 特定の日付データのみを効率的に追加・更新できます。 |
| コスト削減 | データ取得量で従量課金されるクラウドサービスを使用する場合に、不要なデータ取得を避けることでコストを削減できます。 |
注意するべき点
分割単位について十分な検討が必要
フォルダ分割単位は 年、月、日、時間などいくつか検討できます。どういった単位で分割するかは、その後のデータ活用方法に応じて十分検討するべきです。いくつかの一般的なケースと階層分けの考え方を紹介します。
- 年単位(year=YYYY/):年単位でデータ集計したい場合に適しています。
- 月単位(year=YYYY/month=MM/):月単位でデータ集計したい場合に適しています。BI では、各種データを月集計するケースが多いため、このような時に向いています。
- 日単位(year=YYYY/month=MM/day=DD/):日単位でデータ集計したい場合に適しています。
最適な分割方法を選択する
日付フォルダ分割は、一般的に時系列データに適していますが、すべてのデータに最適とは限りません。日付分割ではなく、他の目的に沿った意味のフォルダで整理した方がよい場合もあります。
また、どの階層まで分割するかも検討が必要です。例えば、データ量が多い時に分割単位を細かくしすぎると小さなファイルが多数生成される可能性があり、ファイルシステムに負荷をかける可能性があります。逆に、分割単位が大きい場合、読み込むファイルサイズが大きくなり、パフォーマンスに影響が起こる可能性もあります。
取り扱うデータの特性や分析の目的に応じて、パフォーマンスと効率のバランスを十分に検討することが重要です。
PySpark で日付フォルダに分割してデータを入出力する方法
PySpark を使用して日付に基づいてデータを効率的に分割し、管理するための実践的な手法を順を追って説明します。
使用するデータの用意
使用するサンプルデータを以下のコードにより生成します。このデータは、2023/10/1 ~ 2024/3/31 までで 1 時間毎に収集されるデータとして、sin 波にノイズを加えた時系列データを想定しています。例えば、ある設備の計測器からのデータと考えると分かりやすいでしょう。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# サンプル時系列データの生成 (1時間毎データ)
h_range = pd.date_range(start="2023-10-01", end="2024-03-31 23:00:00", freq="H")
hours = np.arange(len(h_range))
# サンプルデータを生成 (週周期で時間単位のsin波)
sin_wave = np.sin(hours / (7 * 24))
# ランダムノイズを生成
noise = np.random.normal(0, 0.3, len(h_range))
# 波形にランダムノイズを付与
measurement_values = sin_wave + noise
# 描画
plt.figure(figsize=(12, 6))
plt.plot(h_range, measurement_values, label='Measurement with Noise')
plt.title('Hourly Measurement Values Over Time')
plt.xlabel('Date and Time')
plt.ylabel('Measurement Value')
plt.legend()
plt.grid(True)
plt.show()描画結果

結果として生成されるデータは上記のようになります。以降の例では、このデータを日付をベースに分割してデータを入出力する例を説明していきます。
日付フォルダでデータを分割して出力する
データを日付フォルダに分割して保存する場合は以下のように write.partitionBy を使用します。なお、write の基本的な使い方が分からない方は「DataFrameの入出力 read, write」を参照してください。
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofmonth, month, year
# Sparkセッションの初期化
spark = SparkSession.builder.appName("partition_example").getOrCreate()
# pandasのDataFrameを作成
pandas_df = pd.DataFrame({
"datetime": h_range,
"measurement": measurement_values
}
)
# Spark DataFrameに変換
df = spark.createDataFrame(pandas_df)
# 年・月・日の列を作成しておく
df = df.withColumn("year", year("datetime"))
df = df.withColumn("month", month("datetime"))
df = df.withColumn("day", dayofmonth("datetime"))
df.show()
# 年単位でパーティション分割して保存
df.write.mode("overwrite").partitionBy("year").parquet("./yearly")
# 月単位でパーティション分割して保存
df.write.mode("overwrite").partitionBy("year", "month").parquet("./monthly")
# 日単位でパーティション分割して保存
df.write.mode("overwrite").partitionBy("year", "month", "day").parquet("./daily")上記プログラムでは、まず pandas の DataFrame を作成し、それを Spark の DataFrame に変換しています。その後に datetime 列から year, month, day の情報を抽出して列を追加し、これらを基にデータを分割して保存します。
ファイルを書き込む write.partitionBy で基準とする列名を指定します。年月日で階層構造を作って作成したい場合は「, (カンマ)」で区切って階層順に列を指定します。
結果としては年は「year=YYYY」、月は「month=MM」、日は「day=DD」という形式のフォルダが作成され、各フォルダに parquet ファイルが出力されます。出力ファイル数は Spark ノードに依存するため複数ファイル出力される場合があります。
年単位フォルダでの保存

月単位フォルダでの保存

日単位フォルダでの保存

データを読み込む方法
日付フォルダで分割したファイルを読み込む際には read メソッドを使用します。いくつかのパターンでの読み込み例を紹介します。
特定のフォルダを指定して読み込む場合
特定の年月のフォルダを指定して読み込む場合は、以下のように対象フォルダを read.parquet に指定します。
# 特定フォルダを指定して読み込む場合
df_read = spark.read.parquet("./monthly/year=2023/month=10")
# ソートしてから表示
df_read = df_read.sort(df_read.datetime)
df_read.show(5)
df_read.orderBy(df_read.datetime, ascending=False).show(5)【実行結果】 +-------------------+--------------------+---+ | datetime| measurement|day| +-------------------+--------------------+---+ |2023-10-01 00:00:00| 0.14901424590336979| 1| |2023-10-01 01:00:00|-0.03552694454855405| 1| |2023-10-01 02:00:00| 0.20621104213982733| 1| |2023-10-01 03:00:00| 0.4747651507543513| 1| |2023-10-01 04:00:00|-0.04643873812079383| 1| +-------------------+--------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-10-31 23:00:00|-0.9504823233260842| 31| |2023-10-31 22:00:00|-1.1440374606662784| 31| |2023-10-31 21:00:00|-1.3387133335934447| 31| |2023-10-31 20:00:00|-1.5053170384963321| 31| |2023-10-31 19:00:00| -0.64692450768512| 31| +-------------------+-------------------+---+ only showing top 5 rows
上記では、読み込んだ先頭と末尾の 5 行を出力していますが、指定した月のファイルが読み込めていることが分かるかと思います。月の例で紹介していますが、年や日の分割をした場合についても同様です。
フォルダ配下をまとめて読み込む場合
例えば、年・月で階層化したフォルダ構成にしていた場合に、年のフォルダ配下にあるすべての月フォルダをまとめて読み込みたい場合があります。このような場合には、以下のように年フォルダを指定します。
# フォルダ配下をまとめて読み込む場合 (以下どちらでも同じ)
df_read = spark.read.parquet("./monthly/year=2023")
# df_read = spark.read.parquet("./monthly/year=2023/*")
# ソートしてから表示
df_read = df_read.sort(df_read.datetime)
df_read.show(5)
df_read.orderBy(df_read.datetime, ascending=False).show(5)【実行結果】 +-------------------+--------------------+---+-----+ | datetime| measurement|day|month| +-------------------+--------------------+---+-----+ |2023-10-01 00:00:00| 0.14901424590336979| 1| 10| |2023-10-01 01:00:00|-0.03552694454855405| 1| 10| |2023-10-01 02:00:00| 0.20621104213982733| 1| 10| |2023-10-01 03:00:00| 0.4747651507543513| 1| 10| |2023-10-01 04:00:00|-0.04643873812079383| 1| 10| +-------------------+--------------------+---+-----+ only showing top 5 rows +-------------------+-------------------+---+-----+ | datetime| measurement|day|month| +-------------------+-------------------+---+-----+ |2023-12-31 23:00:00| 0.6467656757056162| 31| 12| |2023-12-31 22:00:00| 0.9802973425873306| 31| 12| |2023-12-31 21:00:00| 0.6717463226887521| 31| 12| |2023-12-31 20:00:00|0.23803665242220395| 31| 12| |2023-12-31 19:00:00| 0.1319102042628495| 31| 12| +-------------------+-------------------+---+-----+ only showing top 5 rows
年フォルダ配下をまとめて指定する場合は、上記のように "./monthly/year=2023" といった形で年フォルダを指定します。また、ワイルドカードでの読み込みもできるので "./monthly/year=2023/*" としても同様です。
他のサブディレクトリやファイルがある場合
フォルダ配下をまとめて指定する場合で、他のサブディレクトリやファイルがある場合には、ワイルドカードを使用できます。例えば、任意の月フォルダを month=* のようにワイルドカード指定するとフォルダ配下の全ての月のデータを取得できます。
# フォルダ配下をまとめて読み込む場合 (他のサブディレクトリやファイルがある場合)
df_read = spark.read.parquet("./monthly/year=2023/month=*")
# ソートしてから表示
df_read = df_read.sort(df_read.datetime)
df_read.show(5)
df_read.orderBy(df_read.datetime, ascending=False).show(5)【実行結果】 +-------------------+--------------------+---+ | datetime| measurement|day| +-------------------+--------------------+---+ |2023-10-01 00:00:00| 0.14901424590336979| 1| |2023-10-01 01:00:00|-0.03552694454855405| 1| |2023-10-01 02:00:00| 0.20621104213982733| 1| |2023-10-01 03:00:00| 0.4747651507543513| 1| |2023-10-01 04:00:00|-0.04643873812079383| 1| +-------------------+--------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-12-31 23:00:00| 0.6467656757056162| 31| |2023-12-31 22:00:00| 0.9802973425873306| 31| |2023-12-31 21:00:00| 0.6717463226887521| 31| |2023-12-31 20:00:00|0.23803665242220395| 31| |2023-12-31 19:00:00| 0.1319102042628495| 31| +-------------------+-------------------+---+ only showing top 5 rows
いくつかの特定フォルダを複数読み込む場合
フォルダ配下のいくつかのフォルダを複数読み込む場合は、以下のようにそれぞれのフォルダを読み込み union で結合することができます。
# いくつかの特定フォルダを複数読み込む場合
df1 = spark.read.parquet("./monthly/year=2023/month=11")
df2 = spark.read.parquet("./monthly/year=2023/month=12")
df_read = df1.union(df2)
# ソートしてから表示
df_read = df_read.sort(df_read.datetime)
df_read.show(5)
df_read.orderBy(df_read.datetime, ascending=False).show(5)【実行結果】 +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-11-01 00:00:00|-0.8046957296660957| 1| |2023-11-01 01:00:00|-1.1793663638979297| 1| |2023-11-01 02:00:00|-0.9072289030283197| 1| |2023-11-01 03:00:00| -1.191455395525611| 1| |2023-11-01 04:00:00| -1.149843255157325| 1| +-------------------+-------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-12-31 23:00:00| 0.6467656757056162| 31| |2023-12-31 22:00:00| 0.9802973425873306| 31| |2023-12-31 21:00:00| 0.6717463226887521| 31| |2023-12-31 20:00:00|0.23803665242220395| 31| |2023-12-31 19:00:00| 0.1319102042628495| 31| +-------------------+-------------------+---+ only showing top 5 rows
Delta Lake を使用する場合
Delta Lake を使う場合も基本的には同じ考え方で使用することができます。Delta Lake は、データレイク上で ACID 特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーでレイクハウス実装で活用できる技術です。
日付フォルダでデータを分割して出力する
Delta Lake 版に書き換えたプログラムは以下になります。これまで紹介してきたプログラムとの変化点は 2 点です。DataFrame の操作に関わる部分に変更はありません。
- Spark セッション初期化の際に Delta Lake のための
configを設定 - 保存の際に
format("delta")を指定し、parquetをsaveに変更
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import dayofmonth, month, year
# Sparkセッションの初期化
builder = (
SparkSession.builder.appName("partition_example")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)
# Delta Lakeで動作するように設定
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# pandasのDataFrameを作成
pandas_df = pd.DataFrame({
"datetime": h_range,
"measurement": measurement_values
}
)
# Spark DataFrameに変換
df = spark.createDataFrame(pandas_df)
# 年・月・日の列を作成しておく
df = df.withColumn("year", year("datetime"))
df = df.withColumn("month", month("datetime"))
df = df.withColumn("day", dayofmonth("datetime"))
df.show()
# 年単位でパーティション分割して保存
df.write.format("delta").mode("overwrite").partitionBy("year").save("./yearly-delta")
# 月単位でパーティション分割して保存
df.write.format("delta").mode("overwrite").partitionBy("year", "month").save("./monthly-delta")
# 日単位でパーティション分割して保存
df.write.format("delta").mode("overwrite").partitionBy("year", "month", "day").save("./daily-delta")実行すると Delta Lake としてファイルが保存されます。parquet でデータが保存されるとともに _delta_log というフォルダが生成されます。
データを読み込む方法
Delta Lake 版でデータを読み込む場合の変化としては、読み込みの際に format("delta") を指定し、parquet を load に変更します。上記で見てきた読み込みのパターンで見てみましょう。実行結果は変わらないため省略します。
なお、Delta Lake はワイルドカード指定をサポートしていない点に注意が必要です。(※Delta Lake のバージョンにより異なる可能性があります)
特定のフォルダを指定して読み込む場合
# 特定フォルダを指定して読み込む場合
df_read = spark.read.format("delta").load("./monthly-delta/year=2023/month=10")フォルダ配下をまとめて読み込む場合
# フォルダ配下をまとめて読み込む場合
df_read = spark.read.format("delta").load("./monthly-delta/year=2023")他のサブディレクトリやファイルがある場合
他のサブディレクトリがある場合、Delta Lake ではワイルドカード指定できないため、month=* のような指定ができません。そのため、以降で紹介するように複数フォルダをそれぞれ読み込んで union する必要があります。
# いくつかのフォルダを複数読み込む場合
df1 = spark.read.format("delta").load("./monthly-delta/year=2023/month=11")
df2 = spark.read.format("delta").load("./monthly-delta/year=2023/month=12")
df_read = df1.union(df2)レイクハウス構築の際には、Delta Lake の技術を採用する場合もあると思うので、使い方を覚えておくと良いかと思います。
まとめ
PySpark でデータを保存する際に、日付フォルダで分割してデータを保存する方法を解説しました。
データレイク(例:Amazon S3、Azure ADLS等)において、特に大量のデータを管理している際、日付フォルダにデータを分割して保存することは重要な方法です。これにより、クエリのパフォーマンス向上など多くの利点があります。
この記事では、簡単なサンプルデータを用意して、日付フォルダで分割して保存、読み込みを行う方法を実装例を用いて紹介しました。
日付フォルダ分割は、すべてのデータに適しているとは限りませんが、時系列データを扱う場合などには適しています。日付フォルダでの分割をうまく活用してデータ活用をしてもらえたらと思います。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。







