PySparkでデータを保存する際に、日付フォルダで分割してデータを入出力する方法を解説します。
Contents
データを日付フォルダに分割して保存
PySparkでDataFrameの入出力を行う際は、write
やread
メソッドを使用します。DataFrameの基本的な入出力方法については「DataFrameの入出力 read, write」でまとめていますので参考にしてください。
データレイク(例:Amazon S3、Azure ADLS等)において、特に大量のデータを管理している際、日付フォルダにデータを分割して保存することは重要な方法です。これにより、クエリのパフォーマンス向上など多くの利点があります。
本記事では、PySparkにおいて日付フォルダでデータを効率的に分割して入出力する方法を紹介します。
データのフォルダ分割と保存:メリットと注意点
データをフォルダ分割して保存して扱うメリットや注意するべき点を紹介します。このあたりの考え方を十分理解されている方は読み飛ばしていただければと思います。
メリット
データレイクでデータを扱う場合には、効率的な管理ができるようにフォルダ構成をよく検討することが重要です。扱うデータの特徴により適切な構成は異なりますが、例えば、時系列データ保存では、日付ごとにフォルダを分ける方法はベストプラクティスの1つです。
日付ごとにフォルダを分けてデータ管理することには以下のようなメリットがあります。
項目 | 内容 |
---|---|
クエリのパフォーマンス向上 | 日付ごとにデータを分けることで、特定の日付範囲のクエリの実行時間を短縮できます。 |
データ整理/管理の簡素化 | 日付ごとにデータを分けることで、データの所在が明確になり、管理が容易になります。 |
スケーラビリティの向上 | 大規模データセットの効率的な分散処理が可能になります。 |
データ追加/更新の容易さ | 特定の日付データのみを効率的に追加・更新できます。 |
コスト削減 | データ取得量で従量課金されるクラウドサービスを使用する場合に、不要なデータ取得を避けることでコストを削減できます。 |
注意するべき点
分割単位の十分な検討が必要
フォルダ分割単位は、年、月、日、時間などいくつか検討できます。どういった単位で分割するかは、その後のデータ活用方法に応じて十分検討するべきです。いくつかの一般的なケースと階層分けの考え方を紹介します。
- 年単位(year=YYYY/):年単位でデータを集計したい場合に適しています。
- 月単位(year=YYYY/month=MM/):月単位でデータを集計する場合に適しています。BIでは、各種データを月集計するようなケースが多いため、このような時に向いています。
- 日単位(year=YYYY/month=MM/day=DD/):日単位でデータ分析をすることが中心の場合に適しています。
最適な分割方法を選択する
日付フォルダ分割は、一般的に時系列データに適していますが、すべてのデータに最適とは限りません。日付分割ではなく、他の目的に沿った意味のフォルダで整理した方がよい場合もあります。
また、どの階層まで分割するかも検討が必要です。例えば、データ量が多い時に分割単位を細かくしすぎると小さなファイルが多数生成される可能性があり、ファイルシステムに負荷をかける可能性があります。逆に、分割単位が大きい場合、読み込むファイルサイズが大きくなり、パフォーマンスに影響が起こる可能性もあります。
取り扱うデータの特性や分析の目的に応じて、パフォーマンスと効率のバランスを十分に検討することが重要になります。
以下のMicrosoft Azure Data Lake Storage Gen2のベストプラクティスのページにデータ構造検討の参考になる情報があり参考になります。
PySparkで日付フォルダに分割してデータを入出力する方法
ここからは、PySparkを使用して日付に基づいてデータを効率的に分割し、管理するための実践的な手法を順を追って説明します。
使用するデータの用意
以降の説明で使用するサンプルデータを以下のコードにより生成します。このデータは、2023/10/1~2024/3/31までで1時間毎に収集されるデータとして、sin波にノイズを加えた時系列データを想定しています。例えば、ある設備の計測器からのデータと考えると分かりやすいでしょう。
import pandas as pd import numpy as np import matplotlib.pyplot as plt # サンプル時系列データの生成 (1時間毎データ) h_range = pd.date_range(start="2023-10-01", end="2024-03-31 23:00:00", freq="H") hours = np.arange(len(h_range)) # サンプルデータを生成 (週周期で時間単位のsin波) sin_wave = np.sin(hours / (7 * 24)) # ランダムノイズを生成 noise = np.random.normal(0, 0.3, len(h_range)) # 波形にランダムノイズを付与 measurement_values = sin_wave + noise # 描画 plt.figure(figsize=(12, 6)) plt.plot(h_range, measurement_values, label='Measurement with Noise') plt.title('Hourly Measurement Values Over Time') plt.xlabel('Date and Time') plt.ylabel('Measurement Value') plt.legend() plt.grid(True) plt.show()
描画結果
結果として生成されるデータは上記の図のようになります。以降の例では、このデータを日付をベースに分割してデータを入出力する例を説明していきます。
日付フォルダでデータを分割して出力する
データを日付フォルダに分割して保存する場合は以下のようにwrite.partitionBy
が使用できます。なお、write
の基本的な使い方が分からない方は「DataFrameの入出力 read, write」でまとめていますので、そちらを参照してください。
from pyspark.sql import SparkSession from pyspark.sql.functions import dayofmonth, month, year # Sparkセッションの初期化 spark = SparkSession.builder.appName("partition_example").getOrCreate() # pandasのDataFrameを作成 pandas_df = pd.DataFrame({ "datetime": h_range, "measurement": measurement_values } ) # Spark DataFrameに変換 df = spark.createDataFrame(pandas_df) # 年・月・日の列を作成しておく df = df.withColumn("year", year("datetime")) df = df.withColumn("month", month("datetime")) df = df.withColumn("day", dayofmonth("datetime")) df.show() # 年単位でパーティション分割して保存 df.write.mode("overwrite").partitionBy("year").parquet("./yearly") # 月単位でパーティション分割して保存 df.write.mode("overwrite").partitionBy("year", "month").parquet("./monthly") # 日単位でパーティション分割して保存 df.write.mode("overwrite").partitionBy("year", "month", "day").parquet("./daily")
上記プログラムでは、まずpandasのDataFrameを作成し、それをSparkのDataFrameに変換しています。その後にdatetime
列から、year
, month
, day
の情報を抽出して列を追加し、これらを基にデータを分割して保存します。
ファイルを書き込むwrite.partitionBy
で基準とする列名を指定します。年月日で階層構造を作って作成したい場合は、,(カンマ)で区切って階層順に列を指定します。
結果としては年は「year=YYYY」、月は「month=MM」、日は「day=DD」という形式のフォルダが作成され、各フォルダにparquetファイルが出力されます。出力されるファイル数はSparkノードに依存するため複数ファイル出力される場合があります。
年単位フォルダでの保存
月単位フォルダでの保存
日単位フォルダでの保存
データを読み込む方法
日付フォルダで分割したファイルを読み込む際には、read
メソッドを使用します。いくつかのパターンでの読み込み例を紹介します。
特定のフォルダを指定して読み込む場合
特定の年月のフォルダを指定して読み込む場合は、以下のように対象フォルダをread.parquet
に指定します。
# 特定フォルダを指定して読み込む場合 df_read = spark.read.parquet("./monthly/year=2023/month=10") # ソートしてから表示 df_read = df_read.sort(df_read.datetime) df_read.show(5) df_read.orderBy(df_read.datetime, ascending=False).show(5)
【実行結果】 +-------------------+--------------------+---+ | datetime| measurement|day| +-------------------+--------------------+---+ |2023-10-01 00:00:00| 0.14901424590336979| 1| |2023-10-01 01:00:00|-0.03552694454855405| 1| |2023-10-01 02:00:00| 0.20621104213982733| 1| |2023-10-01 03:00:00| 0.4747651507543513| 1| |2023-10-01 04:00:00|-0.04643873812079383| 1| +-------------------+--------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-10-31 23:00:00|-0.9504823233260842| 31| |2023-10-31 22:00:00|-1.1440374606662784| 31| |2023-10-31 21:00:00|-1.3387133335934447| 31| |2023-10-31 20:00:00|-1.5053170384963321| 31| |2023-10-31 19:00:00| -0.64692450768512| 31| +-------------------+-------------------+---+ only showing top 5 rows
上記では、読み込んだ先頭と末尾の5行を出力していますが、指定した月のファイルが読み込めていることが分かるかと思います。月の例で紹介していますが、年や日の分割をした場合についても同様です。
フォルダ配下をまとめて読み込む場合
例えば、年・月で階層化したフォルダ構成にしていた場合に、年のフォルダ配下にあるすべての月フォルダをまとめて読み込みたい場合があります。このような場合には、以下のように年フォルダを指定することで対応できます。
# フォルダ配下をまとめて読み込む場合 (以下どちらでも同じ) df_read = spark.read.parquet("./monthly/year=2023") # df_read = spark.read.parquet("./monthly/year=2023/*") # ソートしてから表示 df_read = df_read.sort(df_read.datetime) df_read.show(5) df_read.orderBy(df_read.datetime, ascending=False).show(5)
【実行結果】 +-------------------+--------------------+---+-----+ | datetime| measurement|day|month| +-------------------+--------------------+---+-----+ |2023-10-01 00:00:00| 0.14901424590336979| 1| 10| |2023-10-01 01:00:00|-0.03552694454855405| 1| 10| |2023-10-01 02:00:00| 0.20621104213982733| 1| 10| |2023-10-01 03:00:00| 0.4747651507543513| 1| 10| |2023-10-01 04:00:00|-0.04643873812079383| 1| 10| +-------------------+--------------------+---+-----+ only showing top 5 rows +-------------------+-------------------+---+-----+ | datetime| measurement|day|month| +-------------------+-------------------+---+-----+ |2023-12-31 23:00:00| 0.6467656757056162| 31| 12| |2023-12-31 22:00:00| 0.9802973425873306| 31| 12| |2023-12-31 21:00:00| 0.6717463226887521| 31| 12| |2023-12-31 20:00:00|0.23803665242220395| 31| 12| |2023-12-31 19:00:00| 0.1319102042628495| 31| 12| +-------------------+-------------------+---+-----+ only showing top 5 rows
年フォルダ配下をまとめて指定したい場合は、上記例のように"./monthly/year=2023"
といった形で年フォルダを指定することができます。また、ワイルドカードでの読み込みもできるので"./monthly/year=2023/*"
としても同様です。
このようにすることで、複数フォルダをまとめて読み込むことが可能です。
他のサブディレクトリやファイルがある場合
上記のようにフォルダ配下をまとめて指定する場合で、他のサブディレクトリやファイルがある場合には、以下のように月フォルダをmonth=*
のようにワイルドカード指定することが可能です。
# フォルダ配下をまとめて読み込む場合 (他のサブディレクトリやファイルがある場合) df_read = spark.read.parquet("./monthly/year=2023/month=*") # ソートしてから表示 df_read = df_read.sort(df_read.datetime) df_read.show(5) df_read.orderBy(df_read.datetime, ascending=False).show(5)
【実行結果】 +-------------------+--------------------+---+ | datetime| measurement|day| +-------------------+--------------------+---+ |2023-10-01 00:00:00| 0.14901424590336979| 1| |2023-10-01 01:00:00|-0.03552694454855405| 1| |2023-10-01 02:00:00| 0.20621104213982733| 1| |2023-10-01 03:00:00| 0.4747651507543513| 1| |2023-10-01 04:00:00|-0.04643873812079383| 1| +-------------------+--------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-12-31 23:00:00| 0.6467656757056162| 31| |2023-12-31 22:00:00| 0.9802973425873306| 31| |2023-12-31 21:00:00| 0.6717463226887521| 31| |2023-12-31 20:00:00|0.23803665242220395| 31| |2023-12-31 19:00:00| 0.1319102042628495| 31| +-------------------+-------------------+---+ only showing top 5 rows
いくつかの特定フォルダを複数読み込む場合
フォルダ配下のいくつかのフォルダを複数読み込む場合は、以下のようにそれぞれのフォルダを読み込みunion
で結合することで対応ができます。
# いくつかの特定フォルダを複数読み込む場合 df1 = spark.read.parquet("./monthly/year=2023/month=11") df2 = spark.read.parquet("./monthly/year=2023/month=12") df_read = df1.union(df2) # ソートしてから表示 df_read = df_read.sort(df_read.datetime) df_read.show(5) df_read.orderBy(df_read.datetime, ascending=False).show(5)
【実行結果】 +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-11-01 00:00:00|-0.8046957296660957| 1| |2023-11-01 01:00:00|-1.1793663638979297| 1| |2023-11-01 02:00:00|-0.9072289030283197| 1| |2023-11-01 03:00:00| -1.191455395525611| 1| |2023-11-01 04:00:00| -1.149843255157325| 1| +-------------------+-------------------+---+ only showing top 5 rows +-------------------+-------------------+---+ | datetime| measurement|day| +-------------------+-------------------+---+ |2023-12-31 23:00:00| 0.6467656757056162| 31| |2023-12-31 22:00:00| 0.9802973425873306| 31| |2023-12-31 21:00:00| 0.6717463226887521| 31| |2023-12-31 20:00:00|0.23803665242220395| 31| |2023-12-31 19:00:00| 0.1319102042628495| 31| +-------------------+-------------------+---+ only showing top 5 rows
Delta Lakeを使用する場合
Delta Lakeを使う場合も基本的には同じ考え方で使用することができます。Delta Lakeは、データレイク上でACID特性を提供し、スケーラブルで安全なデータ処理、管理を提供するための機能レイヤーでレイクハウス実装で活用できる技術です。
Delta Lakeの概要や基本的な使い方の基本については「Delta Lakeの使い方の基本」にまとめていますので興味があれば参考にしてください。
日付フォルダでデータを分割して出力する
Delta Lake版に書き換えた出力プログラムは以下のようになります。これまで紹介してきたプログラムとの変化点は2点です。DataFrameの操作に関わる部分には変更はありません。
- Sparkセッション初期化の際に、Delta Lakeのための
config
設定 - 保存の際に、
format("delta")
を指定し、parquet
をsave
に変更
from pyspark.sql import SparkSession from delta import configure_spark_with_delta_pip from pyspark.sql.functions import dayofmonth, month, year # Sparkセッションの初期化 builder = ( SparkSession.builder.appName("partition_example") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") ) # Delta Lakeで動作するように設定 spark = configure_spark_with_delta_pip(builder).getOrCreate() # pandasのDataFrameを作成 pandas_df = pd.DataFrame({ "datetime": h_range, "measurement": measurement_values } ) # Spark DataFrameに変換 df = spark.createDataFrame(pandas_df) # 年・月・日の列を作成しておく df = df.withColumn("year", year("datetime")) df = df.withColumn("month", month("datetime")) df = df.withColumn("day", dayofmonth("datetime")) df.show() # 年単位でパーティション分割して保存 df.write.format("delta").mode("overwrite").partitionBy("year").save("./yearly-delta") # 月単位でパーティション分割して保存 df.write.format("delta").mode("overwrite").partitionBy("year", "month").save("./monthly-delta") # 日単位でパーティション分割して保存 df.write.format("delta").mode("overwrite").partitionBy("year", "month", "day").save("./daily-delta")
実行するとDelta Lakeとしてファイルが保存されます。parquetでデータが保存されるとともに_delta_logというフォルダが生成されます。
データを読み込む方法
Delta Lake版でデータを読み込む場合の変化としては、読み込みの際に、format("delta")
を指定し、parquet
をload
に変更します。上記で見てきた読み込みのパターンで見てみましょう。実行結果は変わらないため省略します。
なお、Delta Lakeはワイルドカードでの指定をサポートしていない点に注意が必要です。(※Delta Lakeのバージョンにより異なる可能性があります)
特定のフォルダを指定して読み込む場合
# 特定フォルダを指定して読み込む場合 df_read = spark.read.format("delta").load("./monthly-delta/year=2023/month=10")
フォルダ配下をまとめて読み込む場合
# フォルダ配下をまとめて読み込む場合 df_read = spark.read.format("delta").load("./monthly-delta/year=2023")
Delta Lakeではワイルドカード指定できないため、"./monthly/year=2023/*"
というような指定はできません。(※Delta Lakeのバージョンにより異なる可能性があります)
他のサブディレクトリやファイルがある場合
他のサブディレクトリがある場合、Delta Lakeではワイルドカード指定できないため、month=*
のような指定ができません。(※Delta Lakeのバージョンにより異なる可能性があります)
以下の「いくつかの特定フォルダを複数読み込む場合」の例のように、複数フォルダをそれぞれ読み込んでunion
する必要があります。
いくつかの特定フォルダを複数読み込む場合
# いくつかのフォルダを複数読み込む場合 df1 = spark.read.format("delta").load("./monthly-delta/year=2023/month=11") df2 = spark.read.format("delta").load("./monthly-delta/year=2023/month=12") df_read = df1.union(df2)
レイクハウス構築の際には、Delta Lakeの技術を採用する場合もあると思うので、使い方を覚えておいてもらうと良いかと思います。
まとめ
PySparkでデータを保存する際に、日付フォルダで分割してデータを保存する方法を解説しました。
データレイク(例:Amazon S3、Azure ADLS等)において、特に大量のデータを管理している際、日付フォルダにデータを分割して保存することは重要な方法です。これにより、クエリのパフォーマンス向上など多くの利点があります。
本記事では、簡単なサンプルデータを用意して、日付フォルダで分割して保存し、読み込む方法について実装例を紹介しました。
日付フォルダ分割は、すべてのデータに適しているとは限りませんが、例えば時系列データを扱う場合などには適しています。ぜひ本記事で紹介した日付フォルダでの分割をうまく活用してデータ活用してもらえたらと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。