PySpark

【PySpark】DataFrameの入出力 read, write

【PySpark】DataFrameの入出力 read write
naoki-hn

PySpark での DataFrame の入出力方法を解説します。

DataFrameの入出力

PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。

この記事では、PySpark で DataFrame を入出力する方法を紹介します。データフォーマットとしては分析でよく使用される CSV、Parquet、ORC、JSON といった形式を扱います。

実行環境は、Docker で構築した Spark 環境の Jupyter Notebook を使用します。環境構築方法は「PySparkの実行環境をDockerで用意する方法」を参考にしてください。

Spark と PySpark 概要や Spark アプリケーションの概念は「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」を参考にしてください。

ファイル形式

今回の入出力で扱うファイル形式 (CSV、Parquet、ORC、JSON) について説明してます。各ファイルに関して詳しい人は読み飛ばしていただいて結構です。

CSV ファイル

CSV (Comma-Separated Values) ファイルは、データをテキストとして表現するシンプルなフォーマットの 1 つです。名前の通り、CSV ファイルのデータは「, (コンマ)」で区切られた値で構成されます。このフォーマットは、データベースやスプレッドシートからのデータのエクスポートやインポートに非常によく使われます。

この記事では、PySpark での入出力を紹介しますが、Python 標準や pandas での CSV 入出力は以下を参考にしてください。

【Python】CSVファイルの入出力

【pandas】CSVファイルの入出力

Parquet ファイル

Parquet ファイルは、データ分析でよく使用される列指向のストレージフォーマットです。大量のデータの効率的な読み取り、書き込み、および圧縮をサポートするため、大規模なデータ分析に適しています。

Parquet ファイルは、バイナリフォーマットのため直接参照できませんが、データ加工の処理フローでは Parquet で効率よく処理し、データの利用ユーザーに公開する際に CSV ファイルにして提供するといったことも行われます。

この記事では、PySpark での入出力を紹介しますが、pandas での Parquet 入出力は以下を参考にしてください。

【pandas】Parquetファイルの入出力

ORCファイル

ORC (Optimized Row Columnar) ファイルは、主に Hadoop エコシステムで使用される列指向のストレージフォーマットの 1 つです。ORC は特に Hive のために設計されており、大量データの効率的な読み取り、書き込み、圧縮をサポートします。

ORC は、特に Hive クエリのパフォーマンスを向上させることを目的として設計されているため、Hive を使用する場合は Hadoop エコシステムを中心にデータ処理する場合に適しているが多いです。また、他の SQL ベースのデータベースシステムやデータ処理フレームワークでも ORC フォーマットが利用されることがあります。

Parquet と ORC のどちらを選択するかは、使用するツールやエコシステムの要件によって検討するのが適切です。

JSON ファイル

JSON (JavaScript Object Notation) ファイルは、JavaScript の一部として生まれたファイルフォーマットですが、現在は多くのプログラミング言語でサポートされています。また、WebAPI(特に RESTful API)で、クライアントとサーバー間でデータをやり取りする際の標準的な手段として広く利用されています。

この記事では、PySpark での入出力を紹介しますが、json モジュールでの JSON 入出力については以下を参考にしてください。

【Python】JSONファイルの入出力

ファイルの書き込み write

PySpark の DataFrame の書き込みは、write メソッドで実行します。write は、DataFrameWriter クラスのメソッドとして実装されています。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

# Parquetファイルへの書き込み
(df.write
    .mode("overwrite")
    .parquet("./output_parquet"))

# ORCファイルへの書き込み
(df.write
    .mode("overwrite")
    .orc("./output_orc"))

# JSONファイルへの書き込み
(df.write
    .mode("overwrite")
    .json("./output_json"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

pyspark write csv 結果

例では、Spark のセッションを「SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()」で初期化しています。"DataFrameExample" の部分は Spark アプリケーションの名前です。なお、SparkSession を終了する際は stop() を使用します。

書き込みのための write は、書き込み対象の DataFrame を使って df.write.csv("output_path") のように使用します。"output_path" の部分にはファイルを出力するフォルダ名を指定します。なお、フォルダがない場合は作成されます。

Parquet ファイルは parquet メソッド、ORC ファイルは orc メソッド、JSON ファイルは json メソッドというように各ファイルフォーマット名のメソッドが用意されています。また、以降で説明する optionmode といったメソッドと併用することで出力をコントロールできます。

option メソッド

write メソッドでは、option メソッドでオプション指定することができます。

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

例では、option メソッドで "header""true" に設定し、ヘッダー行を出力するようにしています。

Apache Spark の DataFrame の option メソッドは、様々な読み取りや書き込みオペレーションの設定を変更するためのキーと値のペアを指定する際に使用します。このメソッドは、主にファイルを読み込む read やファイルに書き込む write と組み合わせて使用されます。

Parquet や ORC は、スキーマ情報を含むファイルフォーマットのため header の指定は必要ありません。なお、例では DataFrame を作成する際に StructTypeStructField を使ってスキーマを明確に指定しています。

PySpark では必ずしもスキーマを指定しなくても使えますが、スキーマ情報を指定しておくことでデータを適切に扱うことができるため、スキーマ情報を指定することをおすすめします。

"header" 以外にも option には指定できるものがあるため調べてみてください。例えば、CSV ファイル書き込みでは "delimiter" 等のオプションを指定できます。

PySparkでは option や後述する mode といったメソッドを例のようにチェーンでつなげる書き方が一般的に使用されます。改行する際には例のように () で囲っておくと可読性高く記載可能です。

DataFrame の作成とスキーマ定義方法は「DataFrameの作成方法とスキーマ」を参考にしてください。

mode メソッド

write メソッドでは mode メソッドでモード指定ができます。

# CSVファイルへの書き込み
(df.write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv"))

例では "overwrite" を指定してファイルを上書きしています。

mode で指定できる代表的なモードは以下の通りです。

モード概要
overwrite既存のデータを上書きします。指定したパスにデータが存在する場合、それを削除して新しいデータを書き込みます。
append既存のデータに追加します。指定したパスに既にデータが存在する場合、新しいデータをその後に追加します。
ignore既存のデータが存在する場合、何も行わずに操作をスキップします。つまり、新しいデータの書き込みを行いません。
error または errorifexists既存のデータが存在する場合、エラーをスローします。このモードがデフォルトの振る舞いです。

複数ファイルに出力される理由

PySpark による出力すると出力結果例で見たように「part-xxxxx.csv」といった複数ファイルが出力されます。これは、Spark では分散環境で処理をしており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているからです。

これらの理由を正確に理解するには Spark の Job やパーティション等の処理の概念を理解しておくことをおすすめします。「Sparkアプリケーションの概念を理解する」で Spark アプリケーションの構成やデータ構造、処理概念についてまとめていますので参考にしてください。

ファイルの読み込み read

PySpark の DataFrame のデータ読み込みは read メソッドで実行します。read は、DataFrameReader クラスのメソッドとして実装されています。

以下例は、上記の write で出力したファイルを読み込んで表示しています。

from pyspark.sql import SparkSession

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

# Parquetファイルへの書き込み
df_parquet = (spark.read
                    .parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()

# ORCファイルへの書き込み
df_orc = (spark.read
                .orc("./output_orc"))
df_orc.printSchema()
df_orc.show()

# JSONファイルへの書き込み
df_json = (spark.read
                .json("./output_json"))
df_json.printSchema()
df_json.show()

# SparkSessionを終了
spark.stop()
【実行結果】
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+---+------+
|age|  name|
+---+------+
| 30|Hanako|
| 25|  Taro|
| 20|  Yuki|
+---+------+

SparkSession の部分の説明は、上記で説明しているため割愛します。read は、SparkSession を介して spark.read.csv("file_path") のように使用します。

また、option メソッドと併用することで読み込みをコントロールできます。

option メソッド

read メソッドでは、option メソッドでオプション指定することができます。

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

例では option メソッドで "header""true" にすることで、ヘッダー行を読み込んでいます。また、CSV ファイルはスキーマ情報を持っていないため "inferSchema"option で指定することでスキーマ情報を推定しながら読み込むことができます。 スキーマ情報は printSchema で確認できます。

Parquet 等のファイルフォーマットはスキーマ情報を含むため "header""inferSchema" を指定する必要はありません。

schema メソッドによるスキーマ指定

StructTypeStructField でスキーマを定義し schema メソッドで指定することで、スキーマに従った形でデータを読み込むことができます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# CSVファイルの読み込み
df_csv = (spark.read
                .option("header", "true")
                .schema(schema)
                .csv("./output_csv"))
df_csv.printSchema()
df_csv.show()

# Parquetファイルへの書き込み
df_parquet = (spark.read
                    .schema(schema)
                    .parquet("./output_parquet"))
df_parquet.printSchema()
df_parquet.show()

# ORCファイルへの書き込み
df_orc = (spark.read
                .schema(schema)
                .orc("./output_orc"))
df_orc.printSchema()
df_orc.show()

# JSONファイルへの書き込み
df_json = (spark.read
                .schema(schema)
                .json("./output_json"))
df_json.printSchema()
df_json.show()

# SparkSessionを終了
spark.stop()
【実行結果】
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+------+---+
|  name|age|
+------+---+
|Hanako| 30|
|  Taro| 25|
|  Yuki| 20|
+------+---+

スキーマを指定すると、データを読み込む際に PySpark が各フィールドのデータ型を推測する必要がなくなり、大きなデータセットを扱う際には読み込み時間の短縮が期待できます。

また、データの正確性の保証や、エラーの早期発見ができますし、コード内にスキーマを明示することにより、他の開発者がコードを読む際の可読性が高まります。

出力ファイルを 1 つに統合する方法 coalescerepartition

上記例では、出力ファイルが複数になりましたが、それは Spark が分散処理をしているためでした。また、ファイルの読み込み時は、複数ファイルになっていても問題なく読み込みが可能でした。

しかし、利用者に提供する時など、場合によってはファイルをまとめた方が都合がよい場合があります。ファイルを 1 つに統合する方法についても見ておきましょう。

なお、1 つにまとめる操作は、処理ノード間のデータ移動が発生することがあるため、加工処理の途中で実行すると全体の処理時間が長くなってしまいます。そのため、処理途中では分割されたファイルのまま扱い、利用者に提供するレベルまでデータが整理された後にまとめる処理を実行するとよいでしょう。

coalesce による統合

coalesce メソッドは、パーティション数を減少させて、単一パーティションに統合することができます。これにより、ファイル出力数をコントロールできます。

以下のようにすることで、複数ファイルを 1 つのファイルに統合することができます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)


# CSVファイルへの書き込み
(df.coalesce(1).write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv_onefile"))

# Parquetファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .parquet("./output_parquet_onefile"))

# ORCファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .orc("./output_orc_onefile"))

# JSONファイルへの書き込み
(df.coalesce(1).write
    .mode("overwrite")
    .json("./output_json_onefile"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

coalesce 出力例

coalesce を使用して出力する際には df.coalesce(1).write というようにします。これにより 1 つの CSV ファイルとして出力可能です。

repartition による統合

coalesce と同じようなメソッドとして repartition があります。以下のようにすることで複数ファイルに 1 つのファイルに統合できます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameInputOutput").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)


# CSVファイルへの書き込み
(df.repartition(1).write
    .option("header", "true")
    .mode("overwrite")
    .csv("./output_csv_onefile"))

# Parquetファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .parquet("./output_parquet_onefile"))

# ORCファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .orc("./output_orc_onefile"))

# JSONファイルへの書き込み
(df.repartition(1).write
    .mode("overwrite")
    .json("./output_json_onefile"))

# SparkSessionを終了
spark.stop()

出力結果例 (CSV ファイルの場合、他のフォーマットも同様)

repartition 出力例

repartition を使用して出力する際には df.repartition(1).write というようにします。上記のように repartition でも同様にファイルをまとめることが可能です。

coalescerepartition の使い分け基準

上記の通り coalescerepartition という類似するメソッドを紹介しました。どちらをいつ使うかという考え方について説明します。もともと、coalescerepartition は、動作と使用目的が異なっています。

coalesce は、パーティション数を減少させるために使用します。また、coalesce はシャッフルという処理ノード間のデータ移動が発生する負荷の高い処理を回避するように複数パーティションを単一パーティションに統合します。そのため、大量の小さなパーティションを統合して効率的なサイズのパーティションを作成する際に適します。

なお、coalesce で処理ノード間の移動が全く発生しないということではないので注意してください。例えば、処理ノード 1 と処理ノード 2 があり、それぞれに 2 つのパーティションが存在する場合、coalesce(1) を使用すると、全てのパーティションを 1 つにまとめる必要があります。このような場合は、データをノード間で移動する必要があります。この場合、coalesceは、既存パーティションを移動して、他のパーティションと結合する最も効率的な方法を選択します。

また、coalesce は、パーティションを増やす場合には使用できません。パーティション数を増やす場合には、repartition の方を使う必要があります。

一方で、repartition は、データをシャッフルして新しいパーティションに再分配します。使い方としては、パーティション数を増やす場合や、特定の列に基づいてデータを再分配する場合に使用します。PySpark では、処理ノード数に対して適切なパーティション数を持つことで効率的な処理が可能となります。このような調整を行う際には repartition を使用するのが適切です。

coalescerepartition の使い分けの基準は以下のような考え方ができます。

  • パーティション数を減少させる場合:coalesce
  • パーティション数を増やす場合:repartition
  • 特定の列に基づいてパーティションを再分割する場合:repartition

状況によりどちらを選択するべきか十分に検討してみてください。

まとめ

PySpark での DataFrame の入出力方法について解説しました。データ分析でよく利用される CSV、Parquet、ORC、JSON といったファイル形式を扱いました。

PySpark の DataFrame でのデータの書き込みは write メソッドとして、読み込みは read メソッドとして実装されています。それぞれのメソッドでは option 等のメソッドを追加で指定することで挙動をコントロールできます。

また、Spark を使用するとファイルの出力が複数になるという特徴があります。Spark では分散処理環境で処理を実行しており、それぞれの処理ノードの各タスクで各パーティションを処理して書き込みを行っているためです。必要に応じてファイル数を制御する coalescerepartition といったメソッドについても紹介しました。

PySpark を用いたデータ処理の流れの中で、ファイルの入出力である readwrite は非常に重要です。ぜひ使い方をしっかりと理解しましょう。

ソースコード

上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド

ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました