【PySpark】DataFrameの作成方法とスキーマ

PySparkでDataFrameを作成する方法について解説します。
目次
DataFrameの作成方法
PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造になっています。
PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください。
本記事では、PySparkのDataFrameの生成方法やスキーマの使い方について紹介します。なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。
DataFrameの基本的な作成方法
PySparkのDataFrameは、様々な方法で作成をすることができます。以降では、主要なDataFrame作成方法について紹介していきます。
タプルのリストから作成する
PySparkのDataFrameは、以下のようにPythonのタプルのリストから作成することができます。各タプルがデータの各行を表すと思ってもらうと分かりやすいかと思います。
from pyspark.sql import SparkSession
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# データをタプルのリストで用意
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# DataFrameを生成
df = spark.createDataFrame(data, ["name", "age"])
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
以下の部分はSparkのセッションを初期化し、最後に終了する部分です。以降の例でも出てきますが説明は省略しますので覚えておいてください。
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
...(省略)...
# SparkSessionを終了
spark.stop()Sparkのセッションは、SparkSession.builder.appName("appname").getOrCreate()という部分でセッションを初期化できます。"appname"の部分はSparkアプリケーションの名前です。なお、SparkSessionを終了する際は、stop()を使用します。
さて、DataFrameの作成方法に戻りましょう。上記例では、データをタプルのリストとしてdataを用意しています。各タプルの値が、データの各行と思ってもらえると分かりやすいと思います。
DataFrameは、createDataFrameで作成します。引数として、用意したdataと各列の名称のリストを渡しています。この例では、name列とage列を持っていることになります。
最後に、DataFrameの内容をshowで表示して内容を確認しています。以上のように、PySparkのDataFrameを作成することが可能です。
辞書のリストから作成する
PySparkのDataFrameは、以下のようにPythonの辞書のリストから作成することができます。
from pyspark.sql import SparkSession
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# データを辞書のリストで用意
data = [{"name": "Taro", "age": 25}, {"name": "Hanako", "age": 30}, {"name": "Yuki", "age": 20}]
# DataFrameを生成
df = spark.createDataFrame(data)
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +---+------+ |age| name| +---+------+ | 25| Taro| | 30|Hanako| | 20| Yuki| +---+------+
上記例では、データを辞書のキーが列名として使用されます。タプルのリストを使用する場合には、createDataFrameの際に列名のリストを指定する必要がありましたが、辞書のリストを使用する場合、各キーが列名として扱われるため、dataだけを渡すだけでPySparkのDataFrameが作成できます。
NumPy配列から作成する
PySparkのDataFrameは、以下のようにNumpyの配列(ndarray)から作成することができます。
from pyspark.sql import SparkSession
import numpy as np
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# データをnumpyのndarrayで用意
array = np.array([("Taro", 25), ("Hanako", 30), ("Yuki", 20)])
# DataFrameを生成
df = spark.createDataFrame(array, ["name", "age"])
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のように、NumPy配列を使用する場合は、Pythonのタプルのリストを使用する場合と同じです。createDataFrameの際に、NumPy配列のarrayと列名のリストを渡して、PySparkのDataFrameを作成します。
pandasのDataFrameから作成する
PySparkのDataFrameは、以下のようにpandasのDataFrameから作成することが可能です。
from pyspark.sql import SparkSession
import pandas as pd
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# データをpandasのDataFrameで用意
pandas_df = pd.DataFrame({"name": ["Taro", "Hanako", "Yuki"], "age": [25, 30, 20]})
# DataFrameを生成
df = spark.createDataFrame(pandas_df)
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のように、pandasのDataFrameを使用する場合は、Pythonの辞書のリストを使用する場合と同じです。createDataFrameの際に、pandasのDataFrameであるpandas_dfを渡して、PySparkのDataFrameを作成します。
pandasのDataFrameは、PySparkのDataFrameと似ていますが、似て非なるものであるため注意が必要です。
pandasのDataFrameの方が、使いやすいメソッドが多いといったことがありますが、pandasのDataFrameでは、単一ノードでの処理になってしまい適切に分散がされなくなります。
RDD (Resilient Distributed Dataset) から作成する
RDD(Resilient Distributed Dataset)は、Sparkでの主要なデータ構造です。PySparkで利用できるDataFrameは、RDDの上に構築された高レベルのAPIとなっていて、土台としてはRDDとなっています。
DataFrameとRDDのどちらのAPIを使うかという選択がありますが、DataFrameは、Sparkの最適化エンジンであるCatalystにより自動的にクエリ最適化が行われる点や、多くのユーザーにとって使いやすいとされるAPIが提供されているため、基本的にはDataFrameの使用が推奨されます。
ただし、特定の要件で低レベルの操作や細かな制御が必要な場合は、RDDを直接使用する場合もありますので、RDDからDataFrameを作成する方法を見ておきましょう。
PySparkのDataFrameを、RDDから作成する場合は以下のようにします。
from pyspark.sql import SparkSession
from pyspark.sql import Row
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()
# データをRDD(Resilient Distributed Dataset)で用意
rdd = spark.sparkContext.parallelize(
[Row(name="Taro", age=25), Row(name="Hanako", age=30), Row(name="Yuki", age=20)]
)
# DataFrameを生成
df = spark.createDataFrame(rdd)
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
RDDは、parallelizeで作成しており、行の定義としてRowというクラスを使用しています。
RDDから作成するといっても、これまでの例と同様です。createDataFrameの引数として、RDDのデータであるrddを渡して、PySparkのDataFrameを作成します。
スキーマを指定した作成方法
PySparkのDataFrameでは、各列がどういった型であるかを指定しなくても使用することは可能です。これは、動的型付けとしてPySparkが型をデータの内容から決めてくれているためです。
この方法では、型付けの際に誤った型として判断してしまうと後の処理で困る場合があります。例えば、日付列として扱いたいのに文字列として扱われて、日付としての処理がうまくできないといったことも考えられます。
PySparkでは、スキーマととしてデータの構造を指定したうえでDataFrameを作成することができます。以降では、スキーマを指定したDataFrameの作成方法を紹介します。
スキーマを指定した作成方法の基本
PySparkのDataFrameを作成する際にスキーマを指定する場合には、以下のようにStructTypeやStructFieldを使ってスキーマを定義してから読み込みます。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameWithSchema").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のようにDataFrameの作成前に、StructTypeとStructFieldを使ってスキーマ(schema)を定義します。StructTypeとStructFieldは、pyspark.sql.typesからインポートします。
各列の定義は同じくpyspark.sql.typesから必要な型定義をインポートします。上記例では文字列のStringTypeと数値のIntegerTypeを使用しています。
StructFieldの引数として、第1引数に列名、第2引数に型(StringType()やIntegerType()等)を指定します。第3引数のTrueは、その列がNullを取り得るかを示します。Trueの場合は、当該列はNullを取り得ますが、Falseの場合はNullを含むことを許しません。
これらのStructFieldのリストをStructTypeに渡すことで、スキーマが作成できます。
スキーマを使用してPySparkのDataFrameを作成する場合には、createDataFrameのschema引数に上記の通り作成したスキーマを指定します。これによりスキーマの定義に従ってDataFrameが作成されます。
スキーマでNullチェックをする場合
上記でStructFieldの第3引数は、その列がNullを取り得るかを示すと説明しました。この項目をFalseに設定した際には、Nullを許可しないため以下の例のように例外となります。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameNullCheck").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("name", StringType(), False),
StructField("age", IntegerType(), True),
])
# データを作成
data = [(None, 25), ("Hanako", 30), ("Yuki", 20)]
# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)
# DataFrameの内容を表示
df.show()
# SparkSessionを終了
spark.stop()【実行結果】 --------------------------------------------------------------------------- PySparkValueError Traceback (most recent call last) ...(途中省略)... PySparkValueError: [CANNOT_BE_NONE] Argument `obj` can not be None.
スキーマで指定可能なデータ型
スキーマを指定する際にStructFieldで指定できるデータ型は、PySparkのpyspark.sql.typesモジュールに定義されています。主なデータ型の一部は以下になります。
スキーマを検討する際に参考にしていただければと思います。なお、PySparkのデータ型の詳細は、公式ドキュメントのこちらを参照してください。
| データ型 | 概要 |
|---|---|
| ByteType | バイト型 |
| ShortType | short整数型 |
| IntegerType | int整数型 |
| LongType | long整数型 |
| FloatType | 単精度浮動小数点型 |
| DoubleType | 倍精度浮動小数点型 |
| DecimalType | 10進数型 |
| StringType | 文字列型 |
| BinaryType | バイナリ型 |
| BooleanType | ブール型 |
| TimestampType | タイムスタンプ型 |
| DateType | 日付型 |
| NullType | Nullのみの型 |
まとめ
PySparkでDataFrameを作成する方法について解説しました。PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。
本記事では、Pythonのデータ、NumPyやpandasのデータからPySparkのDataFrameを作成する基本的な方法や、StructTypeやStructFieldを使ってスキーマを事前に定義しておいてDataFrameを作成する方法について紹介しています。
DataFrameは、PySparkでプログラミングする際の中心的なものとなっています。ぜひ、作成方法をしっかりと理解して使えるようにしていただければと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。







