PySparkで、DataFrameの入出力方法について解説します。データ分析でよく利用されるCSV、Parquet、ORC、JSONといったファイル形式について扱います。
Contents
DataFrameの入出力
PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造です。
PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください
本記事では、PySparkでデータを入出力する方法について紹介します。データ分析でよく使用されるCSV、Parquet、ORC、JSONといったファイル形式について説明します。
なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。
ファイル形式
今回の入出力で扱うファイル形式(CSV, Parquet, ORC, JSON)について簡単に説明をしておきます。各ファイルに関して詳しい人は読み飛ばしていただいて結構です。
CSVファイル
CSV(Comma-Separated Values)ファイルは、データをテキストとして表現するためのシンプルなフォーマットの一つです。名前の通り、CSVファイルのデータはコンマで区切られた値で構成されています。このフォーマットは、データベースやスプレッドシートからのデータのエクスポートやインポートによく使われます。
Parquetファイル
Parquetファイルは、データ分析でよく使用される列指向のストレージフォーマットです。大量のデータの効率的な読み取り、書き込み、および圧縮をサポートするため、大規模なデータ分析に適しています。
Parquetファイルは、バイナリフォーマットのため直接参照できないので、データ加工の処理フローではParquetで効率よく処理し、データの利用ユーザーに公開する際にCSVファイルにして提供するといったことも行われます。
ORCファイル
ORC(Optimized Row Columnar)ファイルは、主にHadoopエコシステムで使用される列指向のストレージフォーマットの一つです。ORCは特にHiveのために設計されており、大量のデータの効率的な読み取り、書き込み、圧縮をサポートしています。
ORCは、特にHiveクエリのパフォーマンスを向上させることを目的として設計されているため、Hiveを使用する場合はHadoopエコシステムを中心にデータ処理する場合に選択されることが多いです。また、他のSQLベースのデータベースシステムやデータ処理フレームワークでもORCフォーマットが利用されることがあります。
ParquetとORCのどちらを選択するかは、使用するツールやエコシステム等の要件によって異なります。
JSONファイル
JSON(JavaScript Object Notation)ファイルは、JavaScriptの一部として生まれたファイルフォーマットですが、現在は多くのプログラミング言語でサポートされています。また、WebAPI(特にRESTful API)で、クライアントとサーバー間でデータをやり取りする際の標準的な手段として広く利用されています。
ファイルの書き込み write
PySparkのDataFrameでのデータの書き込みは、以下の例のようにwrite
メソッドで実行できます。write
は、DataFrameWriter
クラスのメソッドとして実装されています。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) # データを作成 data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)] # スキーマを指定してデータフレームを生成 df = spark.createDataFrame(data, schema=schema) # CSVファイルへの書き込み (df.write .option("header", "true") .mode("overwrite") .csv("./output_csv")) # Parquetファイルへの書き込み (df.write .mode("overwrite") .parquet("./output_parquet")) # ORCファイルへの書き込み (df.write .mode("overwrite") .orc("./output_orc")) # JSONファイルへの書き込み (df.write .mode("overwrite") .json("./output_json")) # SparkSessionを終了 spark.stop()
出力結果例 (CSVファイルの場合、他のフォーマットも同様)
上記例では、Sparkのセッションを、SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
という部分でセッションを初期化しています。"DataFrameExample"
の部分はSparkアプリケーションの名前です。なお、SparkSessionを終了する際は、stop()
を使用します。
書き込みのためのwrite
は、書き込み対象のDataFrameを使ってdf.write.csv("output_path")
のように使うことができます。”output_path”の部分にはファイルを出力するフォルダ名を指定します。フォルダがない場合は作成されます。
Parquetファイルは、parquet
メソッド、ORCファイルは、orc
メソッドというように各ファイルフォーマット名称のメソッドが用意されています。
また、以降で説明するoption
やmode
といったメソッドと併用することで出力の挙動をコントロールできます。
optionメソッド
writeメソッドでは、以下のようにoption
メソッドでオプション指定することができます。
# CSVファイルへの書き込み (df.write .option("header", "true") .mode("overwrite") .csv("./output_csv"))
上記例では、option
メソッドで"header"
を"true"
にすることで、ヘッダー行を出力する指定をしています。
Apache SparkのDataFrameのoption
メソッドは、様々な読み取りや書き込みオペレーションの設定を変更するためのキーと値のペアを指定する際に使用されます。このメソッドは、主にファイルを読み込むread
や、後述するデータをファイルに書き込むwrite
と組み合わせて使用されます。
ParquetやORC等については、スキーマ情報を含むのでheader
の指定は必要ありません。なお、上記でDataFrameを作成する際に、StructType
やStructField
を使ってスキーマを明確に指定しています。
PySparkでは必ずしもスキーマを指定しなくても使えますが、スキーマ情報を適切にParquetファイルなどに保存できるように指定して作成するとよいでしょう。DataFrameの作成とスキーマの定義については「DataFrameの作成方法とスキーマ」でまとめていますので興味があれば参考にしてください。
上記で紹介した"header"
以外にもoption
には指定できるものがあるので調べてみてください。例えば、CSVファイルへの書き込みでは、他にも"delimiter"
等のオプションを指定できます。
なお、PySparkでは、option
や後述するmode
といったメソッドを上記例のようにチェーンでつなげる書き方が一般的に使われます。改行する際には例のように()
で囲っておくと可読性高く記載が可能です。
modeメソッド
write
メソッドでは、以下のようにmode
メソッドでモード指定ができます。
# CSVファイルへの書き込み (df.write .option("header", "true") .mode("overwrite") .csv("./output_csv"))
上記例では、"overwrite"
を指定することで対象フォルダのファイルを上書きするように動作します。
mode
メソッドで指定できる代表的なモードは以下の通りです。
モード | 概要 |
---|---|
overwrite | 既存のデータを上書きします。指定したパスにデータが存在する場合、それを削除して新しいデータを書き込みます。 |
append | 既存のデータに追加します。指定したパスに既にデータが存在する場合、新しいデータをその後に追加します。 |
ignore | 既存のデータが存在する場合、何も行わずに操作をスキップします。つまり、新しいデータの書き込みを行いません。 |
error または errorifexists | 既存のデータが存在する場合、エラーをスローします。このモードがデフォルトの振る舞いです。 |
複数ファイルに出力される理由
PySparkによる出力すると上記出力結果例で紹介したように「part-xxxxx.csv」というような複数ファイルが出力されます。Sparkでは分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているからです。
これらの理由を正確に理解するにはSparkのJobやパーティション等の処理の概念を理解しておくことが有効です。「Sparkアプリケーションの概念を理解する」でSparkアプリケーションの構成やデータ構造、処理概念についてまとめていますので興味があれば参考にしてください。
ファイルの読み込み read
PySparkのDataFrameへのデータの読み込みは、以下の例のようにread
メソッドで実行できます。read
は、DataFrameReader
クラスのメソッドとして実装されています。
以下の例では、上記のwrite
で出力したファイルを読み込んで表示しています。
from pyspark.sql import SparkSession # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate() # CSVファイルの読み込み df_csv = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./output_csv")) df_csv.printSchema() df_csv.show() # Parquetファイルへの書き込み df_parquet = (spark.read .parquet("./output_parquet")) df_parquet.printSchema() df_parquet.show() # ORCファイルへの書き込み df_orc = (spark.read .orc("./output_orc")) df_orc.printSchema() df_orc.show() # JSONファイルへの書き込み df_json = (spark.read .json("./output_json")) df_json.printSchema() df_json.show() # SparkSessionを終了 spark.stop()
【実行結果】 root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- age: long (nullable = true) |-- name: string (nullable = true) +---+------+ |age| name| +---+------+ | 30|Hanako| | 25| Taro| | 20| Yuki| +---+------+
SparkSessionの部分の説明は、上記で説明しているため割愛します。read
は、SparkSessionを介してspark.read.csv("file_path")
のように使用します。
また、以降で説明するoption
メソッドと併用することで出力の挙動をコントロールできます。
optionメソッド
read
メソッドでは、以下のようにoption
メソッドで指定することができます。
# CSVファイルの読み込み df_csv = (spark.read .option("header", "true") .option("inferSchema", "true") .csv("./output_csv")) df_csv.printSchema() df_csv.show()
上記の例では、option
メソッドで"header"
を"true"
にすることで、ヘッダー行を読み込むように指定をしています。また、CSVファイルはスキーマ情報を持っていないため、"inferSchema"
をoption
で指定することでスキーマ情報を推定しながら読み込むことができます。 スキーマ情報は、printSchema
で確認することができます。
Parquet等のファイルフォーマットはスキーマ情報を含むため、"header"
や"inferSchema"
を指定する必要はありません。
schemaメソッドによるスキーマ指定
StructTypeとStructFieldでスキーマを定義し、以下のようにschema
メソッドで指定することで、スキーマに従った形でデータを読み込むことが可能です。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) # CSVファイルの読み込み df_csv = (spark.read .option("header", "true") .schema(schema) .csv("./output_csv")) df_csv.printSchema() df_csv.show() # Parquetファイルへの書き込み df_parquet = (spark.read .schema(schema) .parquet("./output_parquet")) df_parquet.printSchema() df_parquet.show() # ORCファイルへの書き込み df_orc = (spark.read .schema(schema) .orc("./output_orc")) df_orc.printSchema() df_orc.show() # JSONファイルへの書き込み df_json = (spark.read .schema(schema) .json("./output_json")) df_json.printSchema() df_json.show() # SparkSessionを終了 spark.stop()
【実行結果】 root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+
上記のようにスキーマを指定すると、データを読み込む際にPySparkが各フィールドのデータ型を推測する必要がなくなり、大きなデータセットを扱う際に読み込み時間の短縮が期待ができます。
また、データの正確性の保証や、エラーの早期発見ができますし、コード内にスキーマを明示することにより、他の開発者がコードを読む際に明確になり可読性が高まります。
出力ファイルを1つに統合する方法 coalesce, repartition
上記の例では、出力ファイルが複数になりましたが、それはSparkが分散処理をしているためでした。ファイルの読み込みは、複数ファイルでも問題なく読み込み可能です。
しかし、利用者に提供する場合等、場合によってはファイル数をまとめた方が都合がよい場合もあるため、ファイルを1つに統合する方法についても見ておきましょう。
なお、1つにまとめる操作は、処理ノード間のデータ移動が発生することがあるため、加工処理の途中では不必要に統合することはせずに分割されたファイルのまま扱い、利用者に提供するレベルまでデータが整理された際にまとめるといったことをするのがよいかと思います。
coalesceによる統合
coalesce
メソッドを使うことで、パーティション数を減少させることができ、単一パーティションに統合する操作を行うことができます。これにより、ファイル出力の数をコントロールすることができます。
以下のように複数ファイルに1つのファイルに統合することができます。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) # データを作成 data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)] # スキーマを指定してデータフレームを生成 df = spark.createDataFrame(data, schema=schema) # CSVファイルへの書き込み (df.coalesce(1).write .option("header", "true") .mode("overwrite") .csv("./output_csv_onefile")) # Parquetファイルへの書き込み (df.coalesce(1).write .mode("overwrite") .parquet("./output_parquet_onefile")) # ORCファイルへの書き込み (df.coalesce(1).write .mode("overwrite") .orc("./output_orc_onefile")) # JSONファイルへの書き込み (df.coalesce(1).write .mode("overwrite") .json("./output_json_onefile")) # SparkSessionを終了 spark.stop()
出力結果例 (CSVファイルの場合、他のフォーマットも同様)
coalesce
を使用して出力する際には、df.coalesce(1).write
というようにします。このようにすることで上記結果例のように1つのcsvファイルとして出力することが可能です。
repartitionによる統合
coalesce
と同じようなメソッドとしてrepartition
があります。以下のように複数ファイルに1つのファイルに統合することができます。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) # データを作成 data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)] # スキーマを指定してデータフレームを生成 df = spark.createDataFrame(data, schema=schema) # CSVファイルへの書き込み (df.repartition(1).write .option("header", "true") .mode("overwrite") .csv("./output_csv_onefile")) # Parquetファイルへの書き込み (df.repartition(1).write .mode("overwrite") .parquet("./output_parquet_onefile")) # ORCファイルへの書き込み (df.repartition(1).write .mode("overwrite") .orc("./output_orc_onefile")) # JSONファイルへの書き込み (df.repartition(1).write .mode("overwrite") .json("./output_json_onefile")) # SparkSessionを終了 spark.stop()
出力結果例 (CSVファイルの場合、他のフォーマットも同様)
repartition
を使用して出力する際には、df.repartition(1).write
というようにします。上記のようにrepartition
でも同様にファイルをまとめることが可能です。
coalesceとrepartitionの使い分けの基準
上記の通りcoalesce
とrepartition
という類似するメソッドを紹介しました。しかし、どちらをいつ使えばよいのかという使い分けに悩むかと思いますので説明します。もともと、coalesce
とrepartition
は、動作と使用目的が異なっています。
coalesce
は、パーティション数を減少させるために使用されます。また、シャッフルという処理ノード間のデータ移動が発生する負荷の高い処理を回避するように複数のパーティションを単一パーティションに統合する操作を行います。そのため、大量の小さなパーティションを統合して効率的なサイズのパーティションを作成する場合に適しています。
なお、coalesce
で処理ノード間の移動が全く発生しないということではないので注意してください。例えば、処理ノード1と処理ノード2があり、それぞれに2つのパーティションが存在する場合、coalesce(1)
を使用すると、全てのパーティションを1つにまとめる必要があります。その結果、一部のデータをノード間で移動する必要があります。この場合、coalesce
は、既存パーティションの一部を移動して、他のパーティションと結合する最も効率的な方法を選択します。
また、coalesce
は、パーティションを増やす場合には使用できません。パーティション数を増やす場合には、repartition
の方を使う必要があります。
repartition
は、データをシャッフルして新しいパーティションに再分配します。使い方としては、パーティション数を増やす場合や、特定の列に基づいてデータを再分配する場合に使用します。PySparkでは、処理ノード数に対して適切なパーティション数を持つことで、効率的な処理が可能となります。この調整を行う際にrepartition
を使用するのが適切です。
coalesce
とrepartition
の使い分けの一つの基準として簡単にまとめてみると以下のようになります。
- パーティション数を減少させる場合:
coalesce
- パーティション数を増やす場合:
repartition
- 特定の列に基づいてパーティションを再分割する場合:
repartition
状況によりどちらを選択するべきか検討するようにしてみてください。
まとめ
PySparkで、DataFrameの入出力方法について解説しました。データ分析でよく利用されるCSV、Parquet、ORC、JSONといったファイル形式について紹介しています。
PySparkのDataFrameでのデータの書き込みは、DataFrameWriterクラスにwriteメソッドとして、読み込みはDataFrameReaderクラスのread
メソッドとして実装されています。それぞれのメソッドには、option
等のメソッドを指定することで挙動をコントロールすることが可能です。
また、Sparkを使用するとファイルの出力が複数になるという特徴があります。Sparkでは分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているためです。必要に応じてファイル数を制御するcoalesce
やrepartition
といったメソッドについても紹介しています。
PySparkを用いたデータ処理の流れの中で、ファイルの入出力であるread
とwrite
は非常に重要なものになります。ぜひ使い方をしっかりと理解しましょう。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。