PySpark

【PySpark】DataFrameの行・列の基本的な操作方法

【PySpark】DataFrameの行・列の基本的な操作方法

PySparkにおいて、DataFrameの行や列を操作する方法について解説します。

DataFrameの行・列の基本操作

PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造です。

PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください

本記事では、PySparkでDataFrameの行や列を操作する方法について紹介します。

なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。

行 (Rows) の操作

DataFrameの作成

まずは、行の操作を説明するためのDataFrameを以下のように作成します。以降の行操作は、このDataFrameをベースにして説明します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("Firstname", StringType(), True),
    StructField("Lastname", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True),    
])
# データを作成
data = [
    ("James", "Smith", "M", 30, "Sales", 3000),
    ("Anna", "Rose", "F", 41, "Engineering", 4000),
    ("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

DataFrameの作成やスキーマに関する説明は省略します。「DataFrameの作成方法とスキーマ」で詳細を説明していますので、興味があれば参考にしてください。

なお、処理を完了した後にはsparkセッションを終了しますが、以降の説明では省略します。

# SparkSessionを終了
spark.stop()

以降では、上記で作成したデータフレームを使って、行に対する基本操作の方法を説明していきます。

行の追加 union

DataFrameに行を追加する際には、以下のようにunionを使用します。

# ===== 行の追加 union
# 追加する行を作成
new_data = [
    ("Michael", "Brown", "M", 34, "Sales", 3500),
    ("Maria", "Garcia", "F", 20, "Engineering", 2000),
    ("Mary", "Smith", "F", 25, "Accounting", 2500),
]
new_row = spark.createDataFrame(new_data, schema)
# データフレームに追加
df = df.union(new_row)
df.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 25| Accounting|  2500|
+---------+--------+------+---+-----------+------+

上記例では、まずnew_dataをもとにしてnew_rowというDataFrameを作成しています。データフレームに行を追加するには、unionメソッドに作成したDataFrameを渡すことで追加が可能です。

注意点として、unionは、結合する2つのDataFrameのスキーマが完全に一致していることが必要です。スキーマが異なる場合にはエラーが発生するので注意しましょう。

条件をもとに行を抽出 filter/where

指定した条件をもとに行を抽出する場合には、以下のようにfilterまたはwhereを使用します。

# ===== 条件をもとに行を抽出 filter/where
# filterで式で抽出
df_filter = df.filter(df.Age > 30)
df_filter.show()
# filterで文字列条件で抽出
df_filter = df.filter("Age > 30")
df_filter.show()

# --- whereはfilterのエイリアスとなっている
# whereで式で抽出
df_where = df.where(df.Age > 30)
df_where.show()

# whereで文字列条件で抽出
df_where = df.where("Age > 30")
df_where.show()
【実行結果】複数回出力しているがいずれも出力結果は同じ
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
+---------+--------+------+---+-----------+------+

抽出されている実行結果はいずれも同じです。まず、filterwhereは同じものになるためどちらを使うかは開発者次第です。実際に、公式ドキュメントにも「where() is an alias for filter().」というように、wherefilterの別名(エイリアス)と記載されていますので、いずれを使っても問題ありません。

条件の指定の方法としては、「df.Age > 30」のように値にアクセスした条件式で指定する方法と「"Age > 30"」のようにSQLの条件文字列として指定する方法を使用できます。

行の削除 filter

行の削除をしたい場合には、以下のようにfilterで削除対象の行を除いたDataFrameを作ることで実現します。

# ===== 行の削除 filter
# 行を削除
df = df.filter(df.Firstname != "Michael")
df.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 25| Accounting|  2500|
+---------+--------+------+---+-----------+------+

上記例では、Firstname"Micahel"でない行のみを抽出している、つまりはFirstname"Micahel"である行を削除しているということになります。

行の並べ替え sort/orderBy 

DataFrameの行を並べ替えたい場合には、以下のようにsortまたはorderByを使用します。

# ===== 行の並び替え sort/orderBy
# === sortで並び替え
print("===== sort")
# 昇順
df_sort = df.sort(df.Age)
df_sort.show()

# 降順
df_sort = df.sort(df.Age.desc())
df_sort.show()

# 複数列による並び替え
df_sort = df.sort(df.Age, df.Salary.desc())
df_sort.show()

# === orderByで並び替え
print("===== orderBy")
# 昇順
df_orderby = df.orderBy(df.Age)
df_orderby.show()

# 降順
df_orderby = df.orderBy(df.Age.desc())
df_orderby.show()

# 複数条件を指定
df_orderby = df.orderBy(df.Age, df.Salary.desc())
df_orderby.show()
【実行結果】sortもorderByも出力結果は同じ
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|    James|   Smith|     M| 30|      Sales|  3000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|   Robert|Williams|     M| 62|  Logistics|  5000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|    James|   Smith|     M| 30|      Sales|  3000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|    James|   Smith|     M| 30|      Sales|  3000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

sortorderByは使い方としては同じです。上記例のように、sortorderByの引数にdf.Ageといった並び替えのキーとなる列を指定することで並び替えができます。デフォルトは昇順となっています。

降順にしたい場合には、指定する際にdf.Age.desc()のように降順にするdescメソッドを指定します。また、複数列により並び替えを行う場合には、引数に(df.Age, df.Salary.desc())のように指定することで実現できます。この例では、第1キーとしてAge列で昇順に並び替え、第2キーとしてSalary列で降順に並び替えています。

なお、以下のように文字列で列名を指定することも可能です。

from pyspark.sql.functions import col

# 昇順
df_sort = df.sort("Age")

# 降順
df_sort = df.sort(col("Age").desc())

# 複数列による並び替え
df_sort = df.sort("Age", col("Salary").desc())

文字列で指定する場合で、降順に指定する際には、pyspark.sql.functionsからcolをインポートして指定することでメソッドを使用することが可能です。

行のグループ化 groupBy

行のグループ化を行って集約をするような場合には、以下のようにgroupByを使用することができます。

# ===== 行のグループ化 groupBy
# グループ化して行をカウント
df_groupby_count = df.groupBy("Department").count()
df_groupby_count.show()

# グループ化して最大値を計算
df_groupby_max = df.groupBy("Department").max()
df_groupby_max.show()
【実行結果】
+-----------+-----+
| Department|count|
+-----------+-----+
|      Sales|    2|
|Engineering|    2|
|  Logistics|    1|
| Accounting|    1|
+-----------+-----+

+-----------+--------+-----------+
| Department|max(Age)|max(Salary)|
+-----------+--------+-----------+
|      Sales|      34|       3500|
|Engineering|      41|       4000|
|  Logistics|      62|       5000|
| Accounting|      30|       2500|
+-----------+--------+-----------+

上記例のようにgroupByの引数にグループ化する列名を指定します。その後、count()max()のような集約関数を適用することで、グループごとの集約結果を得ることが可能です。

なお、groupBy()の別名(エイリアス)として、groupby()を使うことも可能です。

行の重複削除 dropDuplicates

重複する行を削除したい場合には、以下のようにdropDuplicatesを使用します。

# 行の重複削除 dropDuplicates
# 重複行を追加
new_row = spark.createDataFrame([
    ("Michael", "Brown", "M", 34, "Sales", 3500),
    ("Maria", "Garcia", "F", 20, "Engineering", 2000),
    ("Mary", "Smith", "F", 30, "Accounting", 2500),
], schema)
# データフレームに追加
df_temp = df.union(new_row)
df_temp.show()

# ==== 重複行を削除
print("===== dropDuplicates")
# すべての列が重複するものを削除
df_drop = df_temp.dropDuplicates()
df_drop.show()

# 指定の列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Lastname"])
df_drop.show()

# 複数列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Firstname", "Lastname"])
df_drop.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
+---------+--------+------+---+-----------+------+

===== dropDuplicates
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|    James|   Smith|     M| 30|      Sales|  3000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|     Anna|    Rose|     F| 41|Engineering|  4000|
|    James|   Smith|     M| 30|      Sales|  3000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

上記例では、あえて重複する行をunionで追加して、その後dropDuplicatesで重複行を削除しています。引数を指定しないでdropDuplicatesを指定する場合、すべての列が重複するような行が削除されます。

ある特定の列で重複削除したい場合は、["Lastname"]のように列を指定できます。複数列で重複を確認したい場合は、["Firstname", "Lastname"]のように列名を列挙することで対応可能です。

なお、dropDuplicates()の別名(エイリアス)として、drop_duplicates()を使うことも可能です。

列 (Columns) の基本操作

DataFrameの作成

まずは、列の操作を説明するためのDataFrameを以下のように作成します。以降の列操作は、このDataFrameをベースにして説明します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("Firstname", StringType(), True),
    StructField("Lastname", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True),    
])
# データを作成
data = [
    ("James", "Smith", "M", 30, "Sales", 3000),
    ("Anna", "Rose", "F", 41, "Engineering", 4000),
    ("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

DataFrameの作成やスキーマに関する説明は省略します。「DataFrameの作成方法とスキーマ」で詳細を説明していますので、興味があれば参考にしてください。

なお、処理を完了した後にはsparkセッションを終了しますが、以降の説明では省略します。

# SparkSessionを終了
spark.stop()

以降では、上記で作成したデータフレームを使って、列に対する基本操作の方法を説明していきます。

列の追加 withColumn

列を追加したい場合は、以下のようにwithColumnを使用します。

from pyspark.sql.functions import lit

#===== 列の追加 withColumn
# 定数列を追加
df = df.withColumn("Country", lit("USA"))
df = df.withColumn("Rate", lit(1.1).cast(FloatType()))
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Country: string (nullable = false)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+-------+----+
|Firstname|Lastname|Gender|Age| Department|Salary|Country|Rate|
+---------+--------+------+---+-----------+------+-------+----+
|    James|   Smith|     M| 30|      Sales|  3000|    USA| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000|    USA| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000|    USA| 1.1|
+---------+--------+------+---+-----------+------+-------+----+

上記例のようにwithColumnには、列名と値を渡すことができます。pyspark.sql.functionsからインポートできるlitを使うと定数値列を簡単に作成できます。litはリテラルの意味です。

また、指定した型として作成したい場合は、castを使って型指定することで、指定した型で作成可能です。上記例では1.1という数値をFloatType()として作成しています。

列の名称変更 withColumnRenamed

列の名称変更をしたい場合は、以下のようにwithColumnRenamedを使用します。

# ===== 列の名称変更 withColumnRenamed
df = df.withColumnRenamed("Department", "Dept")
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Country: string (nullable = false)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+-------+----+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Country|Rate|
+---------+--------+------+---+-----------+------+-------+----+
|    James|   Smith|     M| 30|      Sales|  3000|    USA| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000|    USA| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000|    USA| 1.1|
+---------+--------+------+---+-----------+------+-------+----+

withColumnRenamedの引数として、変更前の列名と、変更後の列名を指定することで列名を変更できます。上記例では、Department列をDeptという形で省略形に変更しています。

列の削除 drop

列自体を削除したい場合は、以下のようにdropを使用します。

# ===== 列の削除 drop
df = df.drop("Country")
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+----+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Rate|
+---------+--------+------+---+-----------+------+----+
|    James|   Smith|     M| 30|      Sales|  3000| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000| 1.1|
+---------+--------+------+---+-----------+------+----+

dropで対象列を指定することで列情報を削除することが可能です。上記例では、Country列を削除しています。

列の選択 select

もともとのDataFrameから特定の列だけを選択したい場合には、以下のようにselectを使用します。

# ===== 列の選択 select
df_selected = df.select(["Firstname", "Age"])
df_selected.printSchema()
df_selected.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Age: integer (nullable = true)

+---------+---+
|Firstname|Age|
+---------+---+
|    James| 30|
|     Anna| 41|
|   Robert| 62|
+---------+---+

上記例のようにselectに抽出したい特定の列を列挙したリストを渡すことで、列選択をすることが可能です。

式の適用 expr

列の操作をする際には、いくつかの列を使って計算をしたくなることがよくあります。このような場合には、以下のようにexprを使用すると便利です。

from pyspark.sql.functions import expr

# ===== 式の適用 expr
df = df.withColumn("Increased Salary", expr("Salary * Rate"))
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Rate: float (nullable = false)
 |-- Increased Salary: float (nullable = true)

+---------+--------+------+---+-----------+------+----+----------------+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Rate|Increased Salary|
+---------+--------+------+---+-----------+------+----+----------------+
|    James|   Smith|     M| 30|      Sales|  3000| 1.1|          3300.0|
|     Anna|    Rose|     F| 41|Engineering|  4000| 1.1|          4400.0|
|   Robert|Williams|     M| 62|  Logistics|  5000| 1.1|          5500.0|
+---------+--------+------+---+-----------+------+----+----------------+

exprでは、文字列で列を用いてSQL等の式を指定できます。上記で紹介したwithColumnと合わせて使うことで、計算式を適用した列を簡単に作成することが可能です。

まとめ

PySparkにおいて、DataFrameの行や列を操作する方法について解説しました。

PySparkでは、分散処理環境を使ってデータ分析を効率的に実行することが可能です。DataFrameの行や列の操作は、各種分析などを進めていくにあたっての基本となります。今回は、行や列に対する基本的な操作について各種紹介しました。

ぜひ使い方を覚えてもらって効率的なデータ分析に活用してもらいたいと思います。