PySpark

【PySpark】DataFrameの行・列の基本的な操作方法

【PySpark】DataFrameの行・列の基本的な操作方法
naoki-hn

PySpark で DataFrame の行や列を操作する方法について解説します。

DataFrame の行・列の基本操作

PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。

この記事では PySpark で DataFrame の行や列を操作する方法を紹介します。

実行環境は、Docker で構築した Spark 環境の Jupyter Notebook を使用します。環境構築方法は「PySparkの実行環境をDockerで用意する方法」を参考にしてください。

Spark と PySpark 概要や Spark アプリケーションの概念は「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」を参考にしてください。

行 (Rows) の操作

DataFrame の作成

操作の説明するための DataFrame を以下のように作成します。以降の操作は、この DataFrame をベースにして説明します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("Firstname", StringType(), True),
    StructField("Lastname", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True),    
])
# データを作成
data = [
    ("James", "Smith", "M", 30, "Sales", 3000),
    ("Anna", "Rose", "F", 41, "Engineering", 4000),
    ("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

なお、処理完了後には SparkSession を終了しますが説明では省略します。

# SparkSessionを終了
spark.stop()

以降では、上記で作成した DataFrame を使って行の基本操作を説明します。

DataFrame の作成やスキーマに関する説明は省略しています。「DataFrameの作成方法とスキーマ」を参考にしてください。

行の追加 union

DataFrame に行を追加する際は union を使用します。

# ===== 行の追加 union
# 追加する行を作成
new_data = [
    ("Michael", "Brown", "M", 34, "Sales", 3500),
    ("Maria", "Garcia", "F", 20, "Engineering", 2000),
    ("Mary", "Smith", "F", 25, "Accounting", 2500),
]
new_row = spark.createDataFrame(new_data, schema)
# データフレームに追加
df = df.union(new_row)
df.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 25| Accounting|  2500|
+---------+--------+------+---+-----------+------+

例では、まず new_data をもとにして new_row という DataFrame を作成しています。データフレームに行を追加するには union メソッドに作成した DataFrame を渡します。

注意点として union は「結合する 2 つのDataFrameのスキーマが完全に一致していること」が必要です。スキーマが異なる場合は、エラーとなるので注意しましょう。

条件をもとに行を抽出 filter / where

条件をもとに行を抽出する場合には、filter または where を使用します。

# ===== 条件をもとに行を抽出 filter/where
# filterで式で抽出
df_filter = df.filter(df.Age > 30)
df_filter.show()
# filterで文字列条件で抽出
df_filter = df.filter("Age > 30")
df_filter.show()

# --- whereはfilterのエイリアスとなっている
# whereで式で抽出
df_where = df.where(df.Age > 30)
df_where.show()

# whereで文字列条件で抽出
df_where = df.where("Age > 30")
df_where.show()
【実行結果】複数回出力しているがいずれも出力結果は同じ
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
+---------+--------+------+---+-----------+------+

抽出されている実行結果はいずれも同じです。filterwhere は同じものになるためどちらを使うかは開発者次第です。実際に、公式ドキュメントにも「where() is an alias for filter().」というように、wherefilter の別名(エイリアス)と記載されていますので、いずれを使っても構いません。

条件指定の方法は「df.Age > 30」のように条件式を指定する方法と「"Age > 30"」のように SQL の条件文字列として指定する方法が使用できます。

行の削除 filter

行の削除をしたい場合には、filter で削除対象の行を除いた DataFrame を作ることで実現します。

# ===== 行の削除 filter
# 行を削除
df = df.filter(df.Firstname != "Michael")
df.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 25| Accounting|  2500|
+---------+--------+------+---+-----------+------+

上記例では Firstname"Micahel" でない行のみを抽出しています。これは、Firstname"Micahel" である行を削除していることと同じです。

【なぜ削除を filter で行うのか】

Spark では DataFrame の元になっている RDD(Resilient Distributed Dataset)が不変(immutable)で内容を書き換えることができない仕組みになっています。そのため、行を「削除する」というよりは、不要な行を除いた新しい DataFrame を作り直す形で処理します。

行の並べ替え sort / orderBy 

DataFrame の行を並べ替えたい場合には sort または orderBy を使用します。

# ===== 行の並び替え sort/orderBy
# === sortで並び替え
print("===== sort")
# 昇順
df_sort = df.sort(df.Age)
df_sort.show()

# 降順
df_sort = df.sort(df.Age.desc())
df_sort.show()

# 複数列による並び替え
df_sort = df.sort(df.Age, df.Salary.desc())
df_sort.show()

# === orderByで並び替え
print("===== orderBy")
# 昇順
df_orderby = df.orderBy(df.Age)
df_orderby.show()

# 降順
df_orderby = df.orderBy(df.Age.desc())
df_orderby.show()

# 複数条件を指定
df_orderby = df.orderBy(df.Age, df.Salary.desc())
df_orderby.show()
【実行結果】sortもorderByも出力結果は同じ
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|    James|   Smith|     M| 30|      Sales|  3000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|   Robert|Williams|     M| 62|  Logistics|  5000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|    James|   Smith|     M| 30|      Sales|  3000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|    James|   Smith|     M| 30|      Sales|  3000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

sortorderBy の使い方は同じです。上記例のように sortorderBy の引数に df.Age といった並び替えのキーとなる列を指定することで並び替えができます。デフォルトは昇順となります。

降順にしたい場合には、指定する際に df.Age.desc() のように降順にする desc メソッドを指定します。また、複数列により並び替えを行う場合には、引数に (df.Age, df.Salary.desc()) のように指定することで実現できます。例では、第 1 キーとして Age 列で昇順に並び替え、第 2 キーとして Salary 列で降順に並び替えます。

なお、以下のように文字列で列名を指定することも可能です。

from pyspark.sql.functions import col

# 昇順
df_sort = df.sort("Age")

# 降順
df_sort = df.sort(col("Age").desc())

# 複数列による並び替え
df_sort = df.sort("Age", col("Salary").desc())

文字列で指定する場合で、降順に指定する際には pyspark.sql.functions から col をインポートして指定することでメソッドを使用することが可能です。

行のグループ化 groupBy

行のグループ化して集約するには、groupBy を使用します。

# ===== 行のグループ化 groupBy
# グループ化して行をカウント
df_groupby_count = df.groupBy("Department").count()
df_groupby_count.show()

# グループ化して最大値を計算
df_groupby_max = df.groupBy("Department").max()
df_groupby_max.show()
【実行結果】
+-----------+-----+
| Department|count|
+-----------+-----+
|      Sales|    2|
|Engineering|    2|
|  Logistics|    1|
| Accounting|    1|
+-----------+-----+

+-----------+--------+-----------+
| Department|max(Age)|max(Salary)|
+-----------+--------+-----------+
|      Sales|      34|       3500|
|Engineering|      41|       4000|
|  Logistics|      62|       5000|
| Accounting|      30|       2500|
+-----------+--------+-----------+

例のように groupBy の引数にグループ化する列名を指定します。その後、count()max() のような集約関数を適用することで、グループごとの集約が可能です。なお、groupBy() の別名(エイリアス)として groupby() を使うことも可能です。

行の重複削除 dropDuplicates

重複する行を削除したい場合には、dropDuplicates を使用します。

# 行の重複削除 dropDuplicates
# 重複行を追加
new_row = spark.createDataFrame([
    ("Michael", "Brown", "M", 34, "Sales", 3500),
    ("Maria", "Garcia", "F", 20, "Engineering", 2000),
    ("Mary", "Smith", "F", 30, "Accounting", 2500),
], schema)
# データフレームに追加
df_temp = df.union(new_row)
df_temp.show()

# ==== 重複行を削除
print("===== dropDuplicates")
# すべての列が重複するものを削除
df_drop = df_temp.dropDuplicates()
df_drop.show()

# 指定の列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Lastname"])
df_drop.show()

# 複数列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Firstname", "Lastname"])
df_drop.show()
【実行結果】
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
+---------+--------+------+---+-----------+------+

===== dropDuplicates
+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|  Michael|   Brown|     M| 34|      Sales|  3500|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|    James|   Smith|     M| 30|      Sales|  3000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|     Anna|    Rose|     F| 41|Engineering|  4000|
|    James|   Smith|     M| 30|      Sales|  3000|
|    Maria|  Garcia|     F| 20|Engineering|  2000|
|     Mary|   Smith|     F| 30| Accounting|  2500|
|  Michael|   Brown|     M| 34|      Sales|  3500|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

例では、重複する行を union で追加して、その後 dropDuplicates で重複行を削除しています。引数を指定しないで dropDuplicates を指定する場合、すべての列が重複するような行が削除されます。

特定の列で重複削除したい場合は ["Lastname"] のように列を指定できます。複数列で重複を確認したい場合は ["Firstname", "Lastname"] のように列名を列挙します。

なお、dropDuplicates() の別名(エイリアス)として drop_duplicates() を使うことも可能です。

列 (Columns) の基本操作

DataFrame の作成

列の操作を説明するための DataFrame を以下のように作成します。以降の列操作は、この DataFrame をベースにして説明します。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()

# スキーマを定義
schema = StructType([
    StructField("Firstname", StringType(), True),
    StructField("Lastname", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True),
    StructField("Salary", IntegerType(), True),    
])
# データを作成
data = [
    ("James", "Smith", "M", 30, "Sales", 3000),
    ("Anna", "Rose", "F", 41, "Engineering", 4000),
    ("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+--------+------+---+-----------+------+
|Firstname|Lastname|Gender|Age| Department|Salary|
+---------+--------+------+---+-----------+------+
|    James|   Smith|     M| 30|      Sales|  3000|
|     Anna|    Rose|     F| 41|Engineering|  4000|
|   Robert|Williams|     M| 62|  Logistics|  5000|
+---------+--------+------+---+-----------+------+

以降では、上記で作成したデータフレームを使って列の基本操作を説明します。

列の追加 withColumn

列を追加したい場合は、withColumn を使用します。

from pyspark.sql.functions import lit

#===== 列の追加 withColumn
# 定数列を追加
df = df.withColumn("Country", lit("USA"))
df = df.withColumn("Rate", lit(1.1).cast(FloatType()))
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Country: string (nullable = false)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+-------+----+
|Firstname|Lastname|Gender|Age| Department|Salary|Country|Rate|
+---------+--------+------+---+-----------+------+-------+----+
|    James|   Smith|     M| 30|      Sales|  3000|    USA| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000|    USA| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000|    USA| 1.1|
+---------+--------+------+---+-----------+------+-------+----+

例のように withColumn には、列名と値を渡します。pyspark.sql.functions からインポートできる lit を使うと定数値列を作成できます。lit はリテラルの意味です。

また、指定した型で作成したい場合は cast を使って型指定できます。上記例では 1.1 という数値を FloatType() として作成しています。

列の名称変更 withColumnRenamed

列の名称変更をしたい場合は、withColumnRenamed を使用します。

# ===== 列の名称変更 withColumnRenamed
df = df.withColumnRenamed("Department", "Dept")
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Country: string (nullable = false)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+-------+----+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Country|Rate|
+---------+--------+------+---+-----------+------+-------+----+
|    James|   Smith|     M| 30|      Sales|  3000|    USA| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000|    USA| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000|    USA| 1.1|
+---------+--------+------+---+-----------+------+-------+----+

withColumnRenamed の引数として、変更前の列名と変更後の列名を指定することで列名を変更できます。例では、Department 列を Dept という省略形に変更しています。

列の削除 drop

列自体を削除したい場合は、drop を使用します。

# ===== 列の削除 drop
df = df.drop("Country")
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Rate: float (nullable = false)

+---------+--------+------+---+-----------+------+----+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Rate|
+---------+--------+------+---+-----------+------+----+
|    James|   Smith|     M| 30|      Sales|  3000| 1.1|
|     Anna|    Rose|     F| 41|Engineering|  4000| 1.1|
|   Robert|Williams|     M| 62|  Logistics|  5000| 1.1|
+---------+--------+------+---+-----------+------+----+

drop で対象列を指定することで列情報を削除できます。例では、Country 列を削除しています。

列の選択 select

特定列だけを選択したい場合には、select を使用します。

# ===== 列の選択 select
df_selected = df.select(["Firstname", "Age"])
df_selected.printSchema()
df_selected.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Age: integer (nullable = true)

+---------+---+
|Firstname|Age|
+---------+---+
|    James| 30|
|     Anna| 41|
|   Robert| 62|
+---------+---+

例のように select に抽出したい特定の列を列挙したリストを渡すことで、列選択をすることが可能です。

式の適用 expr

列の操作をする際には、いくつかの列を使って計算をしたくなることがよくあります。このような場合には、expr を使用すると便利です。

from pyspark.sql.functions import expr

# ===== 式の適用 expr
df = df.withColumn("Increased Salary", expr("Salary * Rate"))
df.printSchema()
df.show()
【実行結果】
root
 |-- Firstname: string (nullable = true)
 |-- Lastname: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Rate: float (nullable = false)
 |-- Increased Salary: float (nullable = true)

+---------+--------+------+---+-----------+------+----+----------------+
|Firstname|Lastname|Gender|Age|       Dept|Salary|Rate|Increased Salary|
+---------+--------+------+---+-----------+------+----+----------------+
|    James|   Smith|     M| 30|      Sales|  3000| 1.1|          3300.0|
|     Anna|    Rose|     F| 41|Engineering|  4000| 1.1|          4400.0|
|   Robert|Williams|     M| 62|  Logistics|  5000| 1.1|          5500.0|
+---------+--------+------+---+-----------+------+----+----------------+

expr では、文字列を用いて SQL の式を指定できます。上記で紹介した withColumn と一緒に使うことで、計算式を適用した列を簡単に作成可能です。

まとめ

PySpark において DataFrame の行や列を操作する方法について解説しました。

PySpark では、分散処理環境を使ってデータ分析を効率的に実行することが可能です。DataFrame の行や列の操作は、各種分析などを進めていくにあたっての基本となります。この記事では、行や列に対する基本操作について各種紹介しました。

各種使い方をしっかり覚えて、効率的なデータ分析に活用してもらいたいと思います。

ソースコード

上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド

ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました