【PySpark】DataFrameの入出力 read, write

PySpark での DataFrame の入出力方法を解説します。
目次
DataFrameの入出力
PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。
この記事では、PySpark で DataFrame を入出力する方法を紹介します。データフォーマットとしては分析でよく使用される CSV、Parquet、ORC、JSON といった形式を扱います。
ファイル形式
今回の入出力で扱うファイル形式 (CSV、Parquet、ORC、JSON) について説明してます。各ファイルに関して詳しい人は読み飛ばしていただいて結構です。
CSV ファイル
CSV (Comma-Separated Values) ファイルは、データをテキストとして表現するシンプルなフォーマットの 1 つです。名前の通り、CSV ファイルのデータは「, (コンマ)」で区切られた値で構成されます。このフォーマットは、データベースやスプレッドシートからのデータのエクスポートやインポートに非常によく使われます。
Parquet ファイル
Parquet ファイルは、データ分析でよく使用される列指向のストレージフォーマットです。大量のデータの効率的な読み取り、書き込み、および圧縮をサポートするため、大規模なデータ分析に適しています。
Parquet ファイルは、バイナリフォーマットのため直接参照できませんが、データ加工の処理フローでは Parquet で効率よく処理し、データの利用ユーザーに公開する際に CSV ファイルにして提供するといったことも行われます。
ORCファイル
ORC (Optimized Row Columnar) ファイルは、主に Hadoop エコシステムで使用される列指向のストレージフォーマットの 1 つです。ORC は特に Hive のために設計されており、大量データの効率的な読み取り、書き込み、圧縮をサポートします。
ORC は、特に Hive クエリのパフォーマンスを向上させることを目的として設計されているため、Hive を使用する場合は Hadoop エコシステムを中心にデータ処理する場合に適しているが多いです。また、他の SQL ベースのデータベースシステムやデータ処理フレームワークでも ORC フォーマットが利用されることがあります。
Parquet と ORC のどちらを選択するかは、使用するツールやエコシステムの要件によって検討するのが適切です。
JSON ファイル
JSON (JavaScript Object Notation) ファイルは、JavaScript の一部として生まれたファイルフォーマットですが、現在は多くのプログラミング言語でサポートされています。また、WebAPI(特に RESTful API)で、クライアントとサーバー間でデータをやり取りする際の標準的な手段として広く利用されています。
ファイルの書き込み write
PySpark の DataFrame の書き込みは、write メソッドで実行します。write は、DataFrameWriter クラスのメソッドとして実装されています。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# CSVファイルへの書き込み
(df.write
.option("header", "true")
.mode("overwrite")
.csv("./output_csv"))
# Parquetファイルへの書き込み
(df.write
.mode("overwrite")
.parquet("./output_parquet"))
# ORCファイルへの書き込み
(df.write
.mode("overwrite")
.orc("./output_orc"))
# JSONファイルへの書き込み
(df.write
.mode("overwrite")
.json("./output_json"))
# SparkSessionを終了
spark.stop()出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

例では、Spark のセッションを「SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()」で初期化しています。"DataFrameExample" の部分は Spark アプリケーションの名前です。なお、SparkSession を終了する際は stop() を使用します。
書き込みのための write は、書き込み対象の DataFrame を使って df.write.csv("output_path") のように使用します。"output_path" の部分にはファイルを出力するフォルダ名を指定します。なお、フォルダがない場合は作成されます。
Parquet ファイルは parquet メソッド、ORC ファイルは orc メソッド、JSON ファイルは json メソッドというように各ファイルフォーマット名のメソッドが用意されています。また、以降で説明する option や mode といったメソッドと併用することで出力をコントロールできます。
option メソッド
write メソッドでは、option メソッドでオプション指定することができます。
# CSVファイルへの書き込み
(df.write
.option("header", "true")
.mode("overwrite")
.csv("./output_csv"))例では、option メソッドで "header" を "true" に設定し、ヘッダー行を出力するようにしています。
Apache Spark の DataFrame の option メソッドは、様々な読み取りや書き込みオペレーションの設定を変更するためのキーと値のペアを指定する際に使用します。このメソッドは、主にファイルを読み込む read やファイルに書き込む write と組み合わせて使用されます。
Parquet や ORC は、スキーマ情報を含むファイルフォーマットのため header の指定は必要ありません。なお、例では DataFrame を作成する際に StructType やStructField を使ってスキーマを明確に指定しています。
PySpark では必ずしもスキーマを指定しなくても使えますが、スキーマ情報を指定しておくことでデータを適切に扱うことができるため、スキーマ情報を指定することをおすすめします。
"header" 以外にも option には指定できるものがあるため調べてみてください。例えば、CSV ファイル書き込みでは "delimiter" 等のオプションを指定できます。
PySparkでは option や後述する mode といったメソッドを例のようにチェーンでつなげる書き方が一般的に使用されます。改行する際には例のように () で囲っておくと可読性高く記載可能です。
mode メソッド
write メソッドでは mode メソッドでモード指定ができます。
# CSVファイルへの書き込み
(df.write
.option("header", "true")
.mode("overwrite")
.csv("./output_csv"))例では "overwrite" を指定してファイルを上書きしています。
mode で指定できる代表的なモードは以下の通りです。
| モード | 概要 |
|---|---|
overwrite | 既存のデータを上書きします。指定したパスにデータが存在する場合、それを削除して新しいデータを書き込みます。 |
append | 既存のデータに追加します。指定したパスに既にデータが存在する場合、新しいデータをその後に追加します。 |
ignore | 既存のデータが存在する場合、何も行わずに操作をスキップします。つまり、新しいデータの書き込みを行いません。 |
error または errorifexists | 既存のデータが存在する場合、エラーをスローします。このモードがデフォルトの振る舞いです。 |
複数ファイルに出力される理由
PySpark による出力すると出力結果例で見たように「part-xxxxx.csv」といった複数ファイルが出力されます。これは、Spark では分散環境で処理をしており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているからです。
これらの理由を正確に理解するには Spark の Job やパーティション等の処理の概念を理解しておくことをおすすめします。「Sparkアプリケーションの概念を理解する」で Spark アプリケーションの構成やデータ構造、処理概念についてまとめていますので参考にしてください。
ファイルの読み込み read
PySpark の DataFrame のデータ読み込みは read メソッドで実行します。read は、DataFrameReader クラスのメソッドとして実装されています。
以下例は、上記の write で出力したファイルを読み込んで表示しています。
from pyspark.sql import SparkSession
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
# CSVファイルの読み込み
df_csv = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("./output_csv"))
df_csv.printSchema()
df_csv.show()
# Parquetファイルへの書き込み
df_parquet = (spark.read
.parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()
# ORCファイルへの書き込み
df_orc = (spark.read
.orc("./output_orc"))
df_orc.printSchema()
df_orc.show()
# JSONファイルへの書き込み
df_json = (spark.read
.json("./output_json"))
df_json.printSchema()
df_json.show()
# SparkSessionを終了
spark.stop()【実行結果】 root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- age: long (nullable = true) |-- name: string (nullable = true) +---+------+ |age| name| +---+------+ | 30|Hanako| | 25| Taro| | 20| Yuki| +---+------+
SparkSession の部分の説明は、上記で説明しているため割愛します。read は、SparkSession を介して spark.read.csv("file_path") のように使用します。
また、option メソッドと併用することで読み込みをコントロールできます。
option メソッド
read メソッドでは、option メソッドでオプション指定することができます。
# CSVファイルの読み込み
df_csv = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("./output_csv"))
df_csv.printSchema()
df_csv.show()例では option メソッドで "header" を "true" にすることで、ヘッダー行を読み込んでいます。また、CSV ファイルはスキーマ情報を持っていないため "inferSchema" を option で指定することでスキーマ情報を推定しながら読み込むことができます。 スキーマ情報は printSchema で確認できます。
Parquet 等のファイルフォーマットはスキーマ情報を含むため "header" や "inferSchema" を指定する必要はありません。
schema メソッドによるスキーマ指定
StructType と StructField でスキーマを定義し schema メソッドで指定することで、スキーマに従った形でデータを読み込むことができます。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# CSVファイルの読み込み
df_csv = (spark.read
.option("header", "true")
.schema(schema)
.csv("./output_csv"))
df_csv.printSchema()
df_csv.show()
# Parquetファイルへの書き込み
df_parquet = (spark.read
.schema(schema)
.parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()
# ORCファイルへの書き込み
df_orc = (spark.read
.schema(schema)
.orc("./output_orc"))
df_orc.printSchema()
df_orc.show()
# JSONファイルへの書き込み
df_json = (spark.read
.schema(schema)
.json("./output_json"))
df_json.printSchema()
df_json.show()
# SparkSessionを終了
spark.stop()【実行結果】 root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+ root |-- name: string (nullable = true) |-- age: integer (nullable = true) +------+---+ | name|age| +------+---+ |Hanako| 30| | Taro| 25| | Yuki| 20| +------+---+
スキーマを指定すると、データを読み込む際に PySpark が各フィールドのデータ型を推測する必要がなくなり、大きなデータセットを扱う際には読み込み時間の短縮が期待できます。
また、データの正確性の保証や、エラーの早期発見ができますし、コード内にスキーマを明示することにより、他の開発者がコードを読む際の可読性が高まります。
出力ファイルを 1 つに統合する方法 coalesce、repartition
上記例では、出力ファイルが複数になりましたが、それは Spark が分散処理をしているためでした。また、ファイルの読み込み時は、複数ファイルになっていても問題なく読み込みが可能でした。
しかし、利用者に提供する時など、場合によってはファイルをまとめた方が都合がよい場合があります。ファイルを 1 つに統合する方法についても見ておきましょう。
なお、1 つにまとめる操作は、処理ノード間のデータ移動が発生することがあるため、加工処理の途中で実行すると全体の処理時間が長くなってしまいます。そのため、処理途中では分割されたファイルのまま扱い、利用者に提供するレベルまでデータが整理された後にまとめる処理を実行するとよいでしょう。
coalesce による統合
coalesce メソッドは、パーティション数を減少させて、単一パーティションに統合することができます。これにより、ファイル出力数をコントロールできます。
以下のようにすることで、複数ファイルを 1 つのファイルに統合することができます。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# CSVファイルへの書き込み
(df.coalesce(1).write
.option("header", "true")
.mode("overwrite")
.csv("./output_csv_onefile"))
# Parquetファイルへの書き込み
(df.coalesce(1).write
.mode("overwrite")
.parquet("./output_parquet_onefile"))
# ORCファイルへの書き込み
(df.coalesce(1).write
.mode("overwrite")
.orc("./output_orc_onefile"))
# JSONファイルへの書き込み
(df.coalesce(1).write
.mode("overwrite")
.json("./output_json_onefile"))
# SparkSessionを終了
spark.stop()出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

coalesce を使用して出力する際には df.coalesce(1).write というようにします。これにより 1 つの CSV ファイルとして出力可能です。
repartition による統合
coalesce と同じようなメソッドとして repartition があります。以下のようにすることで複数ファイルに 1 つのファイルに統合できます。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# CSVファイルへの書き込み
(df.repartition(1).write
.option("header", "true")
.mode("overwrite")
.csv("./output_csv_onefile"))
# Parquetファイルへの書き込み
(df.repartition(1).write
.mode("overwrite")
.parquet("./output_parquet_onefile"))
# ORCファイルへの書き込み
(df.repartition(1).write
.mode("overwrite")
.orc("./output_orc_onefile"))
# JSONファイルへの書き込み
(df.repartition(1).write
.mode("overwrite")
.json("./output_json_onefile"))
# SparkSessionを終了
spark.stop()出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

repartition を使用して出力する際には df.repartition(1).write というようにします。上記のように repartition でも同様にファイルをまとめることが可能です。
coalesce と repartition の使い分け基準
上記の通り coalesce と repartition という類似するメソッドを紹介しました。どちらをいつ使うかという考え方について説明します。もともと、coalesce と repartition は、動作と使用目的が異なっています。
coalesce は、パーティション数を減少させるために使用します。また、coalesce はシャッフルという処理ノード間のデータ移動が発生する負荷の高い処理を回避するように複数パーティションを単一パーティションに統合します。そのため、大量の小さなパーティションを統合して効率的なサイズのパーティションを作成する際に適します。
なお、coalesce で処理ノード間の移動が全く発生しないということではないので注意してください。例えば、処理ノード 1 と処理ノード 2 があり、それぞれに 2 つのパーティションが存在する場合、coalesce(1) を使用すると、全てのパーティションを 1 つにまとめる必要があります。このような場合は、データをノード間で移動する必要があります。この場合、coalesceは、既存パーティションを移動して、他のパーティションと結合する最も効率的な方法を選択します。
また、coalesce は、パーティションを増やす場合には使用できません。パーティション数を増やす場合には、repartition の方を使う必要があります。
一方で、repartition は、データをシャッフルして新しいパーティションに再分配します。使い方としては、パーティション数を増やす場合や、特定の列に基づいてデータを再分配する場合に使用します。PySpark では、処理ノード数に対して適切なパーティション数を持つことで効率的な処理が可能となります。このような調整を行う際には repartition を使用するのが適切です。
coalesce と repartition の使い分けの基準は以下のような考え方ができます。
- パーティション数を減少させる場合:
coalesce - パーティション数を増やす場合:
repartition - 特定の列に基づいてパーティションを再分割する場合:
repartition
状況によりどちらを選択するべきか十分に検討してみてください。
まとめ
PySpark での DataFrame の入出力方法について解説しました。データ分析でよく利用される CSV、Parquet、ORC、JSON といったファイル形式を扱いました。
PySpark の DataFrame でのデータの書き込みは write メソッドとして、読み込みは read メソッドとして実装されています。それぞれのメソッドでは option 等のメソッドを追加で指定することで挙動をコントロールできます。
また、Spark を使用するとファイルの出力が複数になるという特徴があります。Spark では分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているためです。必要に応じてファイル数を制御する coalesce や repartition といったメソッドについても紹介しました。
PySpark を用いたデータ処理の流れの中で、ファイルの入出力である read と write は非常に重要です。ぜひ使い方をしっかりと理解しましょう。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。







