PySpark

【PySpark】DataFrameの作成方法とスキーマ

【PySpark】DataFrameの作成方法とスキーマ
naoki-hn

PySpark で DataFrame を作成する方法について解説します。

DataFrame の作成方法

PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。

この記事では、PySpark の DataFrame の生成方法やスキーマの使い方について紹介します。

実行環境は、Docker で構築した Spark 環境の Jupyter Notebook を使用します。環境構築方法は「PySparkの実行環境をDockerで用意する方法」を参考にしてください。

Spark と PySpark 概要や Spark アプリケーションの概念は「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」を参考にしてください。

DataFrame の基本的な作成方法

PySpark の DataFrame は、様々な方法で作成をすることができます。以降では、主要な DataFrame 作成方法について紹介します。

タプルのリストから作成する

PySpark の DataFrame は、Python のタプルのリストから作成できます。各タプルがデータの各行を表すと思うと分かりやすいかと思います。

from pyspark.sql import SparkSession

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# データをタプルのリストで用意
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# DataFrameを生成
df = spark.createDataFrame(data, ["name", "age"])

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

以下の部分は Spark のセッションを初期化し、最後に終了する部分です。以降の例でも出てきますが説明は省略しますので覚えておいてください。

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

...(省略)...

# SparkSessionを終了
spark.stop()

Spark セッションは「SparkSession.builder.appName("appname").getOrCreate()」でセッションを初期化します。"appname" の部分は Spark アプリケーションの名前です。なお、SparkSession を終了する際は stop() を使用します。

DataFrameの作成では、データをタプルのリストとして data を用意しています。各タプルの値が、データの各行と思ってください。DataFrame は、createDataFrame で作成します。引数として、用意した data と各列の名称リストを渡しています。この例では、name 列と age 列を持っていることになります。

最後に、DataFrame の内容を show で表示して内容を確認しています。

辞書のリストから作成する

PySpark の DataFrame は、以下のように辞書のリストからも作成できます。

from pyspark.sql import SparkSession

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# データを辞書のリストで用意
data = [{"name": "Taro", "age": 25}, {"name": "Hanako", "age": 30}, {"name": "Yuki", "age": 20}]

# DataFrameを生成
df = spark.createDataFrame(data)

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+---+------+
|age|  name|
+---+------+
| 25|  Taro|
| 30|Hanako|
| 20|  Yuki|
+---+------+

上記例では、データを辞書のキーが列名として使用されます。タプルのリストを使用する場合には、createDataFrame の際に列名のリストを指定する必要がありましたが、辞書のリストを使用する場合、各キーが列名として扱われるため、data を渡すだけでPySpark の DataFrame が作成できます。

NumPy配列から作成する

PySpark の DataFrame は、Numpy の配列 (ndarray) からも作成できます。

from pyspark.sql import SparkSession
import numpy as np

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# データをnumpyのndarrayで用意
array = np.array([("Taro", 25), ("Hanako", 30), ("Yuki", 20)])

# DataFrameを生成
df = spark.createDataFrame(array, ["name", "age"])

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

NumPy 配列 (ndarray)を使用する場合は、Python のタプルのリストを使用する場合と同じです。createDataFrame の際に、NumPy 配列の array と列名リストを渡して PySpark の DataFrame を作成します。

pandas の DataFrame から作成する

PySpark の DataFrame は、pandasのDataFrame からも作成できます。

from pyspark.sql import SparkSession
import pandas as pd

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# データをpandasのDataFrameで用意
pandas_df = pd.DataFrame({"name": ["Taro", "Hanako", "Yuki"], "age": [25, 30, 20]})

# DataFrameを生成
df = spark.createDataFrame(pandas_df)

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

上記例のように、pandas の DataFrame を使用する場合は、Python の辞書のリストを使用する場合と同じです。createDataFrame の際に pandas の DataFrame である pandas_df を渡して、PySpark の DataFrame を作成します。

注意点

pandas の DataFrame は、PySpark の DataFrame と似ていますが、全く異なるものですので注意してください。pandas の DataFrame の方が、使いやすいメソッドが多いといったことがありますが、pandas の DataFrame では、Spark 環境でも単一ノードでの処理になってしまい適切に分散がされなくなります。

RDD (Resilient Distributed Dataset) から作成する

RDD (Resilient Distributed Dataset) は、Spark での主要なデータ構造です。PySpark で利用できる DataFrame は、RDD の上に構築された高レベルの API となっていて、土台としては RDD となっています。

DataFrame と RDD のどちらの API を使うかという選択がありますが、DataFrame は、Spark の最適化エンジンである Catalyst により自動的にクエリ最適化が行われる点や、多くのユーザーにとって使いやすい API が提供されています。基本的にはDataFrame の使用が推奨されます。

ただし、特定要件で低レベルの操作や細かな制御が必要な場合は、RDD を直接使用する場合もありますので、RDD から DataFrame を作成する方法を見ておきましょう。

PySpark の DataFrame を、RDD から作成する場合は以下のようにします。

from pyspark.sql import SparkSession
from pyspark.sql import Row

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# データをRDD(Resilient Distributed Dataset)で用意
rdd = spark.sparkContext.parallelize(
    [Row(name="Taro", age=25), Row(name="Hanako", age=30), Row(name="Yuki", age=20)]
)

# DataFrameを生成
df = spark.createDataFrame(rdd)

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

RDD は、parallelize で作成し、行の定義で Row というクラスを使用します。RDD から作成するといっても、これまでの例と同様です。createDataFrame の引数として、RDD のデータである rdd を渡して、PySpark の DataFrame を作成します。

スキーマを指定した作成方法

PySpark の DataFrame では、各列がどういった型であるかを指定しなくても使用することは可能です。これは、動的型付けとして PySpark が型をデータの内容から決めてくれるためです。

この方法では、型付けの際に誤った型として判断してしまうと後の処理で困る場合があります。例えば、日付列として扱いたいのに文字列として扱われて、日付としての処理がうまくできないといったことが考えられます。

PySpark では、スキーマととしてデータ構造を指定したうえで DataFrame を作成できます。以降では、スキーマを指定した DataFrame の作成方法を紹介します。

スキーマを指定した作成方法の基本

PySpark の DataFrame を作成する際にスキーマを指定する場合には、以下のように StructTypeStructField を使ってスキーマを定義してから読み込みます。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameWithSchema").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [("Taro", 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
+------+---+
|  name|age|
+------+---+
|  Taro| 25|
|Hanako| 30|
|  Yuki| 20|
+------+---+

上記例のように DataFrame 作成前に、StructTypeStructField を使ってスキーマ (schema) を定義します。StructTypeStructField は、pyspark.sql.types からインポートします。

各列の定義は同じく pyspark.sql.types から必要な型定義をインポートします。上記例では文字列の StringType と数値の IntegerType を使用しています。

StructField の引数として、第 1 引数に列名、第 2 引数に型 (StringType()IntegerType() 等) を指定します。第 3 引数の True は、その列が Null を取り得るかを示します。True の場合は、当該列は Null を取り得ますが、False の場合は Null を含むことを許可しません。

StructField のリストを StructType に渡すことで、スキーマが作成できます。スキーマを使用して PySpark の DataFrame を作成する場合には、createDataFrameschema 引数に上記で作成したスキーマを指定します。これによりスキーマの定義に従って DataFrame が作成されます。

スキーマで Null チェックをする場合

上記で StructField の第 3 引数は、その列が Null を取り得るかを示すと説明しました。この項目を False に設定した際には、Null を許可しないため以下の例のように例外となります。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# SparkSessionの初期化
spark = SparkSession.builder.appName("DataFrameNullCheck").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("name", StringType(), False),
    StructField("age", IntegerType(), True),
])

# データを作成
data = [(None, 25), ("Hanako", 30), ("Yuki", 20)]

# スキーマを指定してデータフレームを生成
df = spark.createDataFrame(data, schema=schema)

# DataFrameの内容を表示
df.show()

# SparkSessionを終了
spark.stop()
【実行結果】
---------------------------------------------------------------------------
PySparkValueError                         Traceback (most recent call last)
...(途中省略)...
PySparkValueError: [CANNOT_BE_NONE] Argument `obj` can not be None.

スキーマで指定可能なデータ型

スキーマを指定する際に StructField で指定できるデータ型は、PySparkの pyspark.sql.types モジュールで定義されています。主なデータ型は以下になります。

スキーマを検討する際に参考にしてください。なお、PySpark のデータ型の詳細は、公式ドキュメントのこちらを参照してください。

データ型概要
ByteTypeバイト型
ShortTypeshort 整数型
IntegerTypeint 整数型
LongTypelong 整数型
FloatType単精度浮動小数点型
DoubleType倍精度浮動小数点型
DecimalType10 進数型
StringType文字列型
BinaryTypeバイナリ型
BooleanTypeブール型
TimestampTypeタイムスタンプ型
DateType日付型
NullTypeNull のみの型

まとめ

PySpark で DataFrameを作成する方法について解説しました。PySpark は、分散処理フレームワークである Apache Spark の Python 用 API です。

この記事では、Python のデータ、NumPy や pandas のデータから PySpark のDataFrame を作成する基本的な方法や StructTypeStructField を使ってスキーマを事前に定義して DataFrame を作成する方法について紹介しています。

DataFrame は、PySpark でプログラミングする際の中心的なものです。ぜひ、作成方法をしっかりと理解して使えるようにしていただければと思います。

ソースコード

上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド
ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました