PySparkでDataFrameを作成する方法について解説します。
Contents
DataFrameの作成方法
PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造になっています。
PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください。
本記事では、PySparkのDataFrameの生成方法やスキーマの使い方について紹介します。なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。
DataFrameの基本的な作成方法
PySparkのDataFrameは、様々な方法で作成をすることができます。以降では、主要なDataFrame作成方法について紹介していきます。
タプルのリストから作成する
PySparkのDataFrameは、以下のようにPythonのタプルのリストから作成することができます。各タプルがデータの各行を表すと思ってもらうと分かりやすいかと思います。
from pyspark.sql import SparkSession # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() # データをタプルのリストで用意 data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)] # DataFrameを生成 df = spark.createDataFrame(data, ["name", "age"]) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
以下の部分はSparkのセッションを初期化し、最後に終了する部分です。以降の例でも出てきますが説明は省略しますので覚えておいてください。
# SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() ...(省略)... # SparkSessionを終了 spark.stop()
Sparkのセッションは、SparkSession.builder.appName("appname").getOrCreate()
という部分でセッションを初期化できます。"appname"
の部分はSparkアプリケーションの名前です。なお、SparkSessionを終了する際は、stop()
を使用します。
さて、DataFrameの作成方法に戻りましょう。上記例では、データをタプルのリストとしてdata
を用意しています。各タプルの値が、データの各行と思ってもらえると分かりやすいと思います。
DataFrameは、createDataFrameで作成します。引数として、用意したdata
と各列の名称のリストを渡しています。この例では、name列とage列を持っていることになります。
最後に、DataFrameの内容をshowで表示して内容を確認しています。以上のように、PySparkのDataFrameを作成することが可能です。
辞書のリストから作成する
PySparkのDataFrameは、以下のようにPythonの辞書のリストから作成することができます。
from pyspark.sql import SparkSession # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() # データを辞書のリストで用意 data = [{"name": "Taro", "age": 25}, {"name": "Hanako", "age": 30}, {"name": "Yuki", "age": 20}] # DataFrameを生成 df = spark.createDataFrame(data) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +---+------+ |age| name| +---+------+ | 25| Taro| | 30|Hanako| | 20| Yuki| +---+------+
上記例では、データを辞書のキーが列名として使用されます。タプルのリストを使用する場合には、createDataFrameの際に列名のリストを指定する必要がありましたが、辞書のリストを使用する場合、各キーが列名として扱われるため、data
だけを渡すだけでPySparkのDataFrameが作成できます。
NumPy配列から作成する
PySparkのDataFrameは、以下のようにNumpyの配列(ndarray
)から作成することができます。
from pyspark.sql import SparkSession import numpy as np # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() # データをnumpyのndarrayで用意 array = np.array([("Taro", 25), ("Hanako", 30), ("Yuki", 20)]) # DataFrameを生成 df = spark.createDataFrame(array, ["name", "age"]) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のように、NumPy配列を使用する場合は、Pythonのタプルのリストを使用する場合と同じです。createDataFrame
の際に、NumPy配列のarray
と列名のリストを渡して、PySparkのDataFrameを作成します。
pandasのDataFrameから作成する
PySparkのDataFrameは、以下のようにpandasのDataFrameから作成することが可能です。
from pyspark.sql import SparkSession import pandas as pd # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() # データをpandasのDataFrameで用意 pandas_df = pd.DataFrame({"name": ["Taro", "Hanako", "Yuki"], "age": [25, 30, 20]}) # DataFrameを生成 df = spark.createDataFrame(pandas_df) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のように、pandasのDataFrameを使用する場合は、Pythonの辞書のリストを使用する場合と同じです。createDataFrame
の際に、pandasのDataFrameであるpandas_df
を渡して、PySparkのDataFrameを作成します。
pandasのDataFrameは、PySparkのDataFrameと似ていますが、似て非なるものであるため注意が必要です。
pandasのDataFrameの方が、使いやすいメソッドが多いといったことがありますが、pandasのDataFrameでは、単一ノードでの処理になってしまい適切に分散がされなくなります。
RDD (Resilient Distributed Dataset) から作成する
RDD(Resilient Distributed Dataset)は、Sparkでの主要なデータ構造です。PySparkで利用できるDataFrameは、RDDの上に構築された高レベルのAPIとなっていて、土台としてはRDDとなっています。
DataFrameとRDDのどちらのAPIを使うかという選択がありますが、DataFrameは、Sparkの最適化エンジンであるCatalystにより自動的にクエリ最適化が行われる点や、多くのユーザーにとって使いやすいとされるAPIが提供されているため、基本的にはDataFrameの使用が推奨されます。
ただし、特定の要件で低レベルの操作や細かな制御が必要な場合は、RDDを直接使用する場合もありますので、RDDからDataFrameを作成する方法を見ておきましょう。
PySparkのDataFrameを、RDDから作成する場合は以下のようにします。
from pyspark.sql import SparkSession from pyspark.sql import Row # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameExample").getOrCreate() # データをRDD(Resilient Distributed Dataset)で用意 rdd = spark.sparkContext.parallelize( [Row(name="Taro", age=25), Row(name="Hanako", age=30), Row(name="Yuki", age=20)] ) # DataFrameを生成 df = spark.createDataFrame(rdd) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
RDDは、parallelize
で作成しており、行の定義としてRow
というクラスを使用しています。
RDDから作成するといっても、これまでの例と同様です。createDataFrame
の引数として、RDDのデータであるrdd
を渡して、PySparkのDataFrameを作成します。
スキーマを指定した作成方法
PySparkのDataFrameでは、各列がどういった型であるかを指定しなくても使用することは可能です。これは、動的型付けとしてPySparkが型をデータの内容から決めてくれているためです。
この方法では、型付けの際に誤った型として判断してしまうと後の処理で困る場合があります。例えば、日付列として扱いたいのに文字列として扱われて、日付としての処理がうまくできないといったことも考えられます。
PySparkでは、スキーマととしてデータの構造を指定したうえでDataFrameを作成することができます。以降では、スキーマを指定したDataFrameの作成方法を紹介します。
スキーマを指定した作成方法の基本
PySparkのDataFrameを作成する際にスキーマを指定する場合には、以下のようにStructType
やStructField
を使ってスキーマを定義してから読み込みます。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameWithSchema").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True), ]) # データを作成 data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)] # スキーマを指定してデータフレームを生成 df = spark.createDataFrame(data, schema=schema) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 +------+---+ | name|age| +------+---+ | Taro| 25| |Hanako| 30| | Yuki| 20| +------+---+
上記例のようにDataFrameの作成前に、StructType
とStructField
を使ってスキーマ(schema
)を定義します。StructType
とStructField
は、pyspark.sql.types
からインポートします。
各列の定義は同じくpyspark.sql.types
から必要な型定義をインポートします。上記例では文字列のStringType
と数値のIntegerType
を使用しています。
StructField
の引数として、第1引数に列名、第2引数に型(StringType()
やIntegerType()
等)を指定します。第3引数のTrue
は、その列がNullを取り得るかを示します。Trueの場合は、当該列はNullを取り得ますが、Falseの場合はNullを含むことを許しません。
これらのStructFieldのリストをStructTypeに渡すことで、スキーマが作成できます。
スキーマを使用してPySparkのDataFrameを作成する場合には、createDataFrame
のschema
引数に上記の通り作成したスキーマを指定します。これによりスキーマの定義に従ってDataFrameが作成されます。
スキーマでNullチェックをする場合
上記でStructField
の第3引数は、その列がNullを取り得るかを示すと説明しました。この項目をFalse
に設定した際には、Nullを許可しないため以下の例のように例外となります。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType # SparkSessionの初期化 spark = SparkSession.builder.appName("DataFrameNullCheck").getOrCreate() # スキーマを定義 schema = StructType([ StructField("name", StringType(), False), StructField("age", IntegerType(), True), ]) # データを作成 data = [(None, 25), ("Hanako", 30), ("Yuki", 20)] # スキーマを指定してデータフレームを生成 df = spark.createDataFrame(data, schema=schema) # DataFrameの内容を表示 df.show() # SparkSessionを終了 spark.stop()
【実行結果】 --------------------------------------------------------------------------- PySparkValueError Traceback (most recent call last) ...(途中省略)... PySparkValueError: [CANNOT_BE_NONE] Argument `obj` can not be None.
スキーマで指定可能なデータ型
スキーマを指定する際にStructField
で指定できるデータ型は、PySparkのpyspark.sql.types
モジュールに定義されています。主なデータ型の一部は以下になります。
スキーマを検討する際に参考にしていただければと思います。なお、PySparkのデータ型の詳細は、公式ドキュメントのこちらを参照してください。
データ型 | 概要 |
---|---|
ByteType | バイト型 |
ShortType | short整数型 |
IntegerType | int整数型 |
LongType | long整数型 |
FloatType | 単精度浮動小数点型 |
DoubleType | 倍精度浮動小数点型 |
DecimalType | 10進数型 |
StringType | 文字列型 |
BinaryType | バイナリ型 |
BooleanType | ブール型 |
TimestampType | タイムスタンプ型 |
DateType | 日付型 |
NullType | Nullのみの型 |
まとめ
PySparkでDataFrameを作成する方法について解説しました。PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。
本記事では、Pythonのデータ、NumPyやpandasのデータからPySparkのDataFrameを作成する基本的な方法や、StructType
やStructField
を使ってスキーマを事前に定義しておいてDataFrameを作成する方法について紹介しています。
DataFrameは、PySparkでプログラミングする際の中心的なものとなっています。ぜひ、作成方法をしっかりと理解して使えるようにしていただければと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。