PySpark

【PySpark】DataFrameの入出力 read, write

【PySpark】DataFrameの入出力 read write

PySparkで、DataFrameの入出力方法について解説します。データ分析でよく利用されるCSVParquetORCJSONといったファイル形式について扱います。

DataFrameの入出力

PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造です。

PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください

本記事では、PySparkでデータを入出力する方法について紹介します。データ分析でよく使用されるCSV、Parquet、ORC、JSONといったファイル形式について説明します。

なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。

ファイル形式

今回の入出力で扱うファイル形式(CSV, Parquet, ORC, JSON)について簡単に説明をしておきます。各ファイルに関して詳しい人は読み飛ばしていただいて結構です。

CSVファイル

CSV(Comma-Separated Values)ファイルは、データをテキストとして表現するためのシンプルなフォーマットの一つです。名前の通り、CSVファイルのデータはコンマで区切られた値で構成されています。このフォーマットは、データベースやスプレッドシートからのデータのエクスポートやインポートによく使われます。

Parquetファイル

Parquetファイルは、データ分析でよく使用される列指向のストレージフォーマットです。大量のデータの効率的な読み取り、書き込み、および圧縮をサポートするため、大規模なデータ分析に適しています。

Parquetファイルは、バイナリフォーマットのため直接参照できないので、データ加工の処理フローではParquetで効率よく処理し、データの利用ユーザーに公開する際にCSVファイルにして提供するといったことも行われます。

ORCファイル

ORC(Optimized Row Columnar)ファイルは、主にHadoopエコシステムで使用される列指向のストレージフォーマットの一つです。ORCは特にHiveのために設計されており、大量のデータの効率的な読み取り、書き込み、圧縮をサポートしています。

ORCは、特にHiveクエリのパフォーマンスを向上させることを目的として設計されているため、Hiveを使用する場合はHadoopエコシステムを中心にデータ処理する場合に選択されることが多いです。また、他のSQLベースのデータベースシステムやデータ処理フレームワークでもORCフォーマットが利用されることがあります。

ParquetとORCのどちらを選択するかは、使用するツールやエコシステム等の要件によって異なります。

JSONファイル

JSON(JavaScript Object Notation)ファイルは、JavaScriptの一部として生まれたファイルフォーマットですが、現在は多くのプログラミング言語でサポートされています。また、WebAPI(特にRESTful API)で、クライアントとサーバー間でデータをやり取りする際の標準的な手段として広く利用されています。

ファイルの書き込み write

PySparkのDataFrameでのデータの書き込みは、以下の例のようにwriteメソッドで実行できます。writeは、DataFrameWriterクラスのメソッドとして実装されています。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

# Parquetファイルへの書き込み
(df.write
    .mode("overwrite")
    .parquet("./output_parquet"))

# ORCファイルへの書き込み
(df.write
    .mode("overwrite")
    .orc("./output_orc"))

# JSONファイルへの書き込み
(df.write
    .mode("overwrite")
    .json("./output_json"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSVファイルの場合、他のフォーマットも同様)

pyspark write csv 結果

上記例では、Sparkのセッションを、SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()という部分でセッションを初期化しています。"DataFrameExample"の部分はSparkアプリケーションの名前です。なお、SparkSessionを終了する際は、stop()を使用します。

書き込みのためのwriteは、書き込み対象のDataFrameを使ってdf.write.csv("output_path")のように使うことができます。”output_path”の部分にはファイルを出力するフォルダ名を指定します。フォルダがない場合は作成されます。

Parquetファイルは、parquetメソッド、ORCファイルは、orcメソッドというように各ファイルフォーマット名称のメソッドが用意されています。

また、以降で説明するoptionmodeといったメソッドと併用することで出力の挙動をコントロールできます。

optionメソッド

writeメソッドでは、以下のようにoptionメソッドでオプション指定することができます。

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

上記例では、optionメソッドで"header""true"にすることで、ヘッダー行を出力する指定をしています。

Apache SparkのDataFrameのoptionメソッドは、様々な読み取りや書き込みオペレーションの設定を変更するためのキーと値のペアを指定する際に使用されます。このメソッドは、主にファイルを読み込むreadや、後述するデータをファイルに書き込むwriteと組み合わせて使用されます。

ParquetやORC等については、スキーマ情報を含むのでheaderの指定は必要ありません。なお、上記でDataFrameを作成する際に、StructTypeStructFieldを使ってスキーマを明確に指定しています。

PySparkでは必ずしもスキーマを指定しなくても使えますが、スキーマ情報を適切にParquetファイルなどに保存できるように指定して作成するとよいでしょう。DataFrameの作成とスキーマの定義については「DataFrameの作成方法とスキーマ」でまとめていますので興味があれば参考にしてください。

上記で紹介した"header"以外にもoptionには指定できるものがあるので調べてみてください。例えば、CSVファイルへの書き込みでは、他にも"delimiter"等のオプションを指定できます。

なお、PySparkでは、optionや後述するmodeといったメソッドを上記例のようにチェーンでつなげる書き方が一般的に使われます。改行する際には例のように()で囲っておくと可読性高く記載が可能です。

modeメソッド

writeメソッドでは、以下のようにmodeメソッドでモード指定ができます。

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

上記例では、"overwrite"を指定することで対象フォルダのファイルを上書きするように動作します。

modeメソッドで指定できる代表的なモードは以下の通りです。

モード概要
overwrite既存のデータを上書きします。指定したパスにデータが存在する場合、それを削除して新しいデータを書き込みます。
append既存のデータに追加します。指定したパスに既にデータが存在する場合、新しいデータをその後に追加します。
ignore既存のデータが存在する場合、何も行わずに操作をスキップします。つまり、新しいデータの書き込みを行いません。
error または errorifexists既存のデータが存在する場合、エラーをスローします。このモードがデフォルトの振る舞いです。

複数ファイルに出力される理由

PySparkによる出力すると上記出力結果例で紹介したように「part-xxxxx.csv」というような複数ファイルが出力されます。Sparkでは分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているからです。

これらの理由を正確に理解するにはSparkのJobやパーティション等の処理の概念を理解しておくことが有効です。「Sparkアプリケーションの概念を理解する」でSparkアプリケーションの構成やデータ構造、処理概念についてまとめていますので興味があれば参考にしてください。

ファイルの読み込み read

PySparkのDataFrameへのデータの読み込みは、以下の例のようにreadメソッドで実行できます。readは、DataFrameReaderクラスのメソッドとして実装されています。

以下の例では、上記のwriteで出力したファイルを読み込んで表示しています。

from pyspark.sql import SparkSession

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

# Parquetファイルへの書き込み
df_parquet = (spark.read
                    .parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()

# ORCファイルへの書き込み
df_orc = (spark.read
                .orc("./output_orc"))
df_orc.printSchema()
df_orc.show()

# JSONファイルへの書き込み
df_json = (spark.read
                .json("./output_json"))
df_json.printSchema()
df_json.show()

# SparkSessionを終了
spark.stop()
【実行結果】
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+---+------+
|age|  name|
+---+------+
| 30|Hanako|
| 25|  Taro|
| 20|  Yuki|
+---+------+

SparkSessionの部分の説明は、上記で説明しているため割愛します。readは、SparkSessionを介してspark.read.csv("file_path")のように使用します。

また、以降で説明するoptionメソッドと併用することで出力の挙動をコントロールできます。

optionメソッド

readメソッドでは、以下のようにoptionメソッドで指定することができます。

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

上記の例では、optionメソッドで"header""true"にすることで、ヘッダー行を読み込むように指定をしています。また、CSVファイルはスキーマ情報を持っていないため、"inferSchema"optionで指定することでスキーマ情報を推定しながら読み込むことができます。 スキーマ情報は、printSchemaで確認することができます。

Parquet等のファイルフォーマットはスキーマ情報を含むため、"header""inferSchema"を指定する必要はありません。

schemaメソッドによるスキーマ指定

StructTypeとStructFieldでスキーマを定義し、以下のようにschemaメソッドで指定することで、スキーマに従った形でデータを読み込むことが可能です。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .schema(schema)
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

# Parquetファイルへの書き込み
df_parquet = (spark.read
                    .schema(schema)
                    .parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()

# ORCファイルへの書き込み
df_orc = (spark.read
                .schema(schema)
                .orc("./output_orc"))
df_orc.printSchema()
df_orc.show()

# JSONファイルへの書き込み
df_json = (spark.read
                .schema(schema)
                .json("./output_json"))
df_json.printSchema()
df_json.show()

# SparkSessionを終了
spark.stop()
【実行結果】
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

上記のようにスキーマを指定すると、データを読み込む際にPySparkが各フィールドのデータ型を推測する必要がなくなり、大きなデータセットを扱う際に読み込み時間の短縮が期待ができます。

また、データの正確性の保証や、エラーの早期発見ができますし、コード内にスキーマを明示することにより、他の開発者がコードを読む際に明確になり可読性が高まります。

出力ファイルを1つに統合する方法 coalesce, repartition

上記の例では、出力ファイルが複数になりましたが、それはSparkが分散処理をしているためでした。ファイルの読み込みは、複数ファイルでも問題なく読み込み可能です。

しかし、利用者に提供する場合等、場合によってはファイル数をまとめた方が都合がよい場合もあるため、ファイルを1つに統合する方法についても見ておきましょう。

なお、1つにまとめる操作は、処理ノード間のデータ移動が発生することがあるため、加工処理の途中では不必要に統合することはせずに分割されたファイルのまま扱い、利用者に提供するレベルまでデータが整理された際にまとめるといったことをするのがよいかと思います。

coalesceによる統合

coalesceメソッドを使うことで、パーティション数を減少させることができ、単一パーティションに統合する操作を行うことができます。これにより、ファイル出力の数をコントロールすることができます。

以下のように複数ファイルに1つのファイルに統合することができます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)


# CSVファイルへの書き込み
(df.coalesce(1).write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv_onefile"))

# Parquetファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .parquet("./output_parquet_onefile"))

# ORCファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .orc("./output_orc_onefile"))

# JSONファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .json("./output_json_onefile"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSVファイルの場合、他のフォーマットも同様)

coalesce 出力例

coalesceを使用して出力する際には、df.coalesce(1).writeというようにします。このようにすることで上記結果例のように1つのcsvファイルとして出力することが可能です。

repartitionによる統合

coalesceと同じようなメソッドとしてrepartitionがあります。以下のように複数ファイルに1つのファイルに統合することができます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)


# CSVファイルへの書き込み
(df.repartition(1).write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv_onefile"))

# Parquetファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .parquet("./output_parquet_onefile"))

# ORCファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .orc("./output_orc_onefile"))

# JSONファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .json("./output_json_onefile"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSVファイルの場合、他のフォーマットも同様)

repartition 出力例

repartitionを使用して出力する際には、df.repartition(1).writeというようにします。上記のようにrepartitionでも同様にファイルをまとめることが可能です。

coalesceとrepartitionの使い分けの基準

上記の通りcoalescerepartitionという類似するメソッドを紹介しました。しかし、どちらをいつ使えばよいのかという使い分けに悩むかと思いますので説明します。もともと、coalescerepartitionは、動作と使用目的が異なっています。

coalesceは、パーティション数を減少させるために使用されます。また、シャッフルという処理ノード間のデータ移動が発生する負荷の高い処理を回避するように複数のパーティションを単一パーティションに統合する操作を行います。そのため、大量の小さなパーティションを統合して効率的なサイズのパーティションを作成する場合に適しています。

なお、coalesceで処理ノード間の移動が全く発生しないということではないので注意してください。例えば、処理ノード1と処理ノード2があり、それぞれに2つのパーティションが存在する場合、coalesce(1)を使用すると、全てのパーティションを1つにまとめる必要があります。その結果、一部のデータをノード間で移動する必要があります。この場合、coalesceは、既存パーティションの一部を移動して、他のパーティションと結合する最も効率的な方法を選択します。

また、coalesceは、パーティションを増やす場合には使用できません。パーティション数を増やす場合には、repartitionの方を使う必要があります。

repartitionは、データをシャッフルして新しいパーティションに再分配します。使い方としては、パーティション数を増やす場合や、特定の列に基づいてデータを再分配する場合に使用します。PySparkでは、処理ノード数に対して適切なパーティション数を持つことで、効率的な処理が可能となります。この調整を行う際にrepartitionを使用するのが適切です。

coalescerepartitionの使い分けの一つの基準として簡単にまとめてみると以下のようになります。

  • パーティション数を減少させる場合:coalesce
  • パーティション数を増やす場合:repartition
  • 特定の列に基づいてパーティションを再分割する場合:repartition

状況によりどちらを選択するべきか検討するようにしてみてください。

まとめ

PySparkで、DataFrameの入出力方法について解説しました。データ分析でよく利用されるCSV、Parquet、ORC、JSONといったファイル形式について紹介しています。

PySparkのDataFrameでのデータの書き込みは、DataFrameWriterクラスにwriteメソッドとして、読み込みはDataFrameReaderクラスのreadメソッドとして実装されています。それぞれのメソッドには、option等のメソッドを指定することで挙動をコントロールすることが可能です。

また、Sparkを使用するとファイルの出力が複数になるという特徴があります。Sparkでは分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているためです。必要に応じてファイル数を制御するcoalescerepartitionといったメソッドについても紹介しています。

PySparkを用いたデータ処理の流れの中で、ファイルの入出力であるreadwriteは非常に重要なものになります。ぜひ使い方をしっかりと理解しましょう。