【PySpark】DataFrameの行・列の基本的な操作方法

PySpark で DataFrame の行や列を操作する方法について解説します。
DataFrame の行・列の基本操作
PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。
この記事では PySpark で DataFrame の行や列を操作する方法を紹介します。
行 (Rows) の操作
DataFrame の作成
操作の説明するための DataFrame を以下のように作成します。以降の操作は、この DataFrame をベースにして説明します。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("Firstname", StringType(), True),
StructField("Lastname", StringType(), True),
StructField("Gender", StringType(), True),
StructField("Age", IntegerType(), True),
StructField("Department", StringType(), True),
StructField("Salary", IntegerType(), True),
])
# データを作成
data = [
("James", "Smith", "M", 30, "Sales", 3000),
("Anna", "Rose", "F", 41, "Engineering", 4000),
("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
なお、処理完了後には SparkSession を終了しますが説明では省略します。
# SparkSessionを終了 spark.stop()
以降では、上記で作成した DataFrame を使って行の基本操作を説明します。
行の追加 union
DataFrame に行を追加する際は union を使用します。
# ===== 行の追加 union
# 追加する行を作成
new_data = [
("Michael", "Brown", "M", 34, "Sales", 3500),
("Maria", "Garcia", "F", 20, "Engineering", 2000),
("Mary", "Smith", "F", 25, "Accounting", 2500),
]
new_row = spark.createDataFrame(new_data, schema)
# データフレームに追加
df = df.union(new_row)
df.show()【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 25| Accounting| 2500| +---------+--------+------+---+-----------+------+
例では、まず new_data をもとにして new_row という DataFrame を作成しています。データフレームに行を追加するには union メソッドに作成した DataFrame を渡します。
注意点として union は「結合する 2 つのDataFrameのスキーマが完全に一致していること」が必要です。スキーマが異なる場合は、エラーとなるので注意しましょう。
条件をもとに行を抽出 filter / where
条件をもとに行を抽出する場合には、filter または where を使用します。
# ===== 条件をもとに行を抽出 filter/where
# filterで式で抽出
df_filter = df.filter(df.Age > 30)
df_filter.show()
# filterで文字列条件で抽出
df_filter = df.filter("Age > 30")
df_filter.show()
# --- whereはfilterのエイリアスとなっている
# whereで式で抽出
df_where = df.where(df.Age > 30)
df_where.show()
# whereで文字列条件で抽出
df_where = df.where("Age > 30")
df_where.show()【実行結果】複数回出力しているがいずれも出力結果は同じ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| +---------+--------+------+---+-----------+------+
抽出されている実行結果はいずれも同じです。filter と where は同じものになるためどちらを使うかは開発者次第です。実際に、公式ドキュメントにも「where() is an alias for filter().」というように、where は filter の別名(エイリアス)と記載されていますので、いずれを使っても構いません。
条件指定の方法は「df.Age > 30」のように条件式を指定する方法と「"Age > 30"」のように SQL の条件文字列として指定する方法が使用できます。
行の削除 filter
行の削除をしたい場合には、filter で削除対象の行を除いた DataFrame を作ることで実現します。
# ===== 行の削除 filter # 行を削除 df = df.filter(df.Firstname != "Michael") df.show()
【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 25| Accounting| 2500| +---------+--------+------+---+-----------+------+
上記例では Firstname が "Micahel" でない行のみを抽出しています。これは、Firstname が "Micahel" である行を削除していることと同じです。
行の並べ替え sort / orderBy
DataFrame の行を並べ替えたい場合には sort または orderBy を使用します。
# ===== 行の並び替え sort/orderBy
# === sortで並び替え
print("===== sort")
# 昇順
df_sort = df.sort(df.Age)
df_sort.show()
# 降順
df_sort = df.sort(df.Age.desc())
df_sort.show()
# 複数列による並び替え
df_sort = df.sort(df.Age, df.Salary.desc())
df_sort.show()
# === orderByで並び替え
print("===== orderBy")
# 昇順
df_orderby = df.orderBy(df.Age)
df_orderby.show()
# 降順
df_orderby = df.orderBy(df.Age.desc())
df_orderby.show()
# 複数条件を指定
df_orderby = df.orderBy(df.Age, df.Salary.desc())
df_orderby.show()【実行結果】sortもorderByも出力結果は同じ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | James| Smith| M| 30| Sales| 3000| | Michael| Brown| M| 34| Sales| 3500| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Robert|Williams| M| 62| Logistics| 5000| | Anna| Rose| F| 41|Engineering| 4000| | Michael| Brown| M| 34| Sales| 3500| | Mary| Smith| F| 30| Accounting| 2500| | James| Smith| M| 30| Sales| 3000| | Maria| Garcia| F| 20|Engineering| 2000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Maria| Garcia| F| 20|Engineering| 2000| | James| Smith| M| 30| Sales| 3000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
sort と orderBy の使い方は同じです。上記例のように sort や orderBy の引数に df.Age といった並び替えのキーとなる列を指定することで並び替えができます。デフォルトは昇順となります。
降順にしたい場合には、指定する際に df.Age.desc() のように降順にする desc メソッドを指定します。また、複数列により並び替えを行う場合には、引数に (df.Age, df.Salary.desc()) のように指定することで実現できます。例では、第 1 キーとして Age 列で昇順に並び替え、第 2 キーとして Salary 列で降順に並び替えます。
なお、以下のように文字列で列名を指定することも可能です。
from pyspark.sql.functions import col
# 昇順
df_sort = df.sort("Age")
# 降順
df_sort = df.sort(col("Age").desc())
# 複数列による並び替え
df_sort = df.sort("Age", col("Salary").desc())文字列で指定する場合で、降順に指定する際には pyspark.sql.functions から col をインポートして指定することでメソッドを使用することが可能です。
行のグループ化 groupBy
行のグループ化して集約するには、groupBy を使用します。
# ===== 行のグループ化 groupBy
# グループ化して行をカウント
df_groupby_count = df.groupBy("Department").count()
df_groupby_count.show()
# グループ化して最大値を計算
df_groupby_max = df.groupBy("Department").max()
df_groupby_max.show()【実行結果】 +-----------+-----+ | Department|count| +-----------+-----+ | Sales| 2| |Engineering| 2| | Logistics| 1| | Accounting| 1| +-----------+-----+ +-----------+--------+-----------+ | Department|max(Age)|max(Salary)| +-----------+--------+-----------+ | Sales| 34| 3500| |Engineering| 41| 4000| | Logistics| 62| 5000| | Accounting| 30| 2500| +-----------+--------+-----------+
例のように groupBy の引数にグループ化する列名を指定します。その後、count() や max() のような集約関数を適用することで、グループごとの集約が可能です。なお、groupBy() の別名(エイリアス)として groupby() を使うことも可能です。
行の重複削除 dropDuplicates
重複する行を削除したい場合には、dropDuplicates を使用します。
# 行の重複削除 dropDuplicates
# 重複行を追加
new_row = spark.createDataFrame([
("Michael", "Brown", "M", 34, "Sales", 3500),
("Maria", "Garcia", "F", 20, "Engineering", 2000),
("Mary", "Smith", "F", 30, "Accounting", 2500),
], schema)
# データフレームに追加
df_temp = df.union(new_row)
df_temp.show()
# ==== 重複行を削除
print("===== dropDuplicates")
# すべての列が重複するものを削除
df_drop = df_temp.dropDuplicates()
df_drop.show()
# 指定の列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Lastname"])
df_drop.show()
# 複数列で重複する行を削除
df_drop = df_temp.dropDuplicates(["Firstname", "Lastname"])
df_drop.show()【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| +---------+--------+------+---+-----------+------+ ===== dropDuplicates +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Anna| Rose| F| 41|Engineering| 4000| | James| Smith| M| 30| Sales| 3000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Anna| Rose| F| 41|Engineering| 4000| | James| Smith| M| 30| Sales| 3000| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
例では、重複する行を union で追加して、その後 dropDuplicates で重複行を削除しています。引数を指定しないで dropDuplicates を指定する場合、すべての列が重複するような行が削除されます。
特定の列で重複削除したい場合は ["Lastname"] のように列を指定できます。複数列で重複を確認したい場合は ["Firstname", "Lastname"] のように列名を列挙します。
なお、dropDuplicates() の別名(エイリアス)として drop_duplicates() を使うことも可能です。
列 (Columns) の基本操作
DataFrame の作成
列の操作を説明するための DataFrame を以下のように作成します。以降の列操作は、この DataFrame をベースにして説明します。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()
# スキーマを定義
schema = StructType([
StructField("Firstname", StringType(), True),
StructField("Lastname", StringType(), True),
StructField("Gender", StringType(), True),
StructField("Age", IntegerType(), True),
StructField("Department", StringType(), True),
StructField("Salary", IntegerType(), True),
])
# データを作成
data = [
("James", "Smith", "M", 30, "Sales", 3000),
("Anna", "Rose", "F", 41, "Engineering", 4000),
("Robert", "Williams", "M", 62, "Logistics", 5000),
]
# データフレームの作成
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
以降では、上記で作成したデータフレームを使って列の基本操作を説明します。
列の追加 withColumn
列を追加したい場合は、withColumn を使用します。
from pyspark.sql.functions import lit
#===== 列の追加 withColumn
# 定数列を追加
df = df.withColumn("Country", lit("USA"))
df = df.withColumn("Rate", lit(1.1).cast(FloatType()))
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) |-- Country: string (nullable = false) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+-------+----+ |Firstname|Lastname|Gender|Age| Department|Salary|Country|Rate| +---------+--------+------+---+-----------+------+-------+----+ | James| Smith| M| 30| Sales| 3000| USA| 1.1| | Anna| Rose| F| 41|Engineering| 4000| USA| 1.1| | Robert|Williams| M| 62| Logistics| 5000| USA| 1.1| +---------+--------+------+---+-----------+------+-------+----+
例のように withColumn には、列名と値を渡します。pyspark.sql.functions からインポートできる lit を使うと定数値列を作成できます。lit はリテラルの意味です。
また、指定した型で作成したい場合は cast を使って型指定できます。上記例では 1.1 という数値を FloatType() として作成しています。
列の名称変更 withColumnRenamed
列の名称変更をしたい場合は、withColumnRenamed を使用します。
# ===== 列の名称変更 withColumnRenamed
df = df.withColumnRenamed("Department", "Dept")
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Country: string (nullable = false) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+-------+----+ |Firstname|Lastname|Gender|Age| Dept|Salary|Country|Rate| +---------+--------+------+---+-----------+------+-------+----+ | James| Smith| M| 30| Sales| 3000| USA| 1.1| | Anna| Rose| F| 41|Engineering| 4000| USA| 1.1| | Robert|Williams| M| 62| Logistics| 5000| USA| 1.1| +---------+--------+------+---+-----------+------+-------+----+
withColumnRenamed の引数として、変更前の列名と変更後の列名を指定することで列名を変更できます。例では、Department 列を Dept という省略形に変更しています。
列の削除 drop
列自体を削除したい場合は、drop を使用します。
# ===== 列の削除 drop
df = df.drop("Country")
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+----+ |Firstname|Lastname|Gender|Age| Dept|Salary|Rate| +---------+--------+------+---+-----------+------+----+ | James| Smith| M| 30| Sales| 3000| 1.1| | Anna| Rose| F| 41|Engineering| 4000| 1.1| | Robert|Williams| M| 62| Logistics| 5000| 1.1| +---------+--------+------+---+-----------+------+----+
drop で対象列を指定することで列情報を削除できます。例では、Country 列を削除しています。
列の選択 select
特定列だけを選択したい場合には、select を使用します。
# ===== 列の選択 select df_selected = df.select(["Firstname", "Age"]) df_selected.printSchema() df_selected.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Age: integer (nullable = true) +---------+---+ |Firstname|Age| +---------+---+ | James| 30| | Anna| 41| | Robert| 62| +---------+---+
例のように select に抽出したい特定の列を列挙したリストを渡すことで、列選択をすることが可能です。
式の適用 expr
列の操作をする際には、いくつかの列を使って計算をしたくなることがよくあります。このような場合には、expr を使用すると便利です。
from pyspark.sql.functions import expr
# ===== 式の適用 expr
df = df.withColumn("Increased Salary", expr("Salary * Rate"))
df.printSchema()
df.show()【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Rate: float (nullable = false) |-- Increased Salary: float (nullable = true) +---------+--------+------+---+-----------+------+----+----------------+ |Firstname|Lastname|Gender|Age| Dept|Salary|Rate|Increased Salary| +---------+--------+------+---+-----------+------+----+----------------+ | James| Smith| M| 30| Sales| 3000| 1.1| 3300.0| | Anna| Rose| F| 41|Engineering| 4000| 1.1| 4400.0| | Robert|Williams| M| 62| Logistics| 5000| 1.1| 5500.0| +---------+--------+------+---+-----------+------+----+----------------+
expr では、文字列を用いて SQL の式を指定できます。上記で紹介した withColumn と一緒に使うことで、計算式を適用した列を簡単に作成可能です。
まとめ
PySpark において DataFrame の行や列を操作する方法について解説しました。
PySpark では、分散処理環境を使ってデータ分析を効率的に実行することが可能です。DataFrame の行や列の操作は、各種分析などを進めていくにあたっての基本となります。この記事では、行や列に対する基本操作について各種紹介しました。
各種使い方をしっかり覚えて、効率的なデータ分析に活用してもらいたいと思います。
上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。







