PySparkにおいて、DataFrameの行や列を操作する方法について解説します。
DataFrameの行・列の基本操作
PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造です。
PySparkは、分散処理フレームワークであるApache SparkのPython用APIです。SparkとPySparkの概要やSparkアプリケーションの概念については「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」でまとめていますので興味があれば参考にしてください
本記事では、PySparkでDataFrameの行や列を操作する方法について紹介します。
なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。
行 (Rows) の操作
DataFrameの作成
まずは、行の操作を説明するためのDataFrameを以下のように作成します。以降の行操作は、このDataFrameをベースにして説明します。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType # SparkSessionの初期化 spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate() # スキーマを定義 schema = StructType([ StructField("Firstname", StringType(), True), StructField("Lastname", StringType(), True), StructField("Gender", StringType(), True), StructField("Age", IntegerType(), True), StructField("Department", StringType(), True), StructField("Salary", IntegerType(), True), ]) # データを作成 data = [ ("James", "Smith", "M", 30, "Sales", 3000), ("Anna", "Rose", "F", 41, "Engineering", 4000), ("Robert", "Williams", "M", 62, "Logistics", 5000), ] # データフレームの作成 df = spark.createDataFrame(data, schema=schema) df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
DataFrameの作成やスキーマに関する説明は省略します。「DataFrameの作成方法とスキーマ」で詳細を説明していますので、興味があれば参考にしてください。
なお、処理を完了した後にはsparkセッションを終了しますが、以降の説明では省略します。
# SparkSessionを終了 spark.stop()
以降では、上記で作成したデータフレームを使って、行に対する基本操作の方法を説明していきます。
行の追加 union
DataFrameに行を追加する際には、以下のようにunion
を使用します。
# ===== 行の追加 union # 追加する行を作成 new_data = [ ("Michael", "Brown", "M", 34, "Sales", 3500), ("Maria", "Garcia", "F", 20, "Engineering", 2000), ("Mary", "Smith", "F", 25, "Accounting", 2500), ] new_row = spark.createDataFrame(new_data, schema) # データフレームに追加 df = df.union(new_row) df.show()
【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 25| Accounting| 2500| +---------+--------+------+---+-----------+------+
上記例では、まずnew_data
をもとにしてnew_row
というDataFrameを作成しています。データフレームに行を追加するには、union
メソッドに作成したDataFrameを渡すことで追加が可能です。
注意点として、union
は、結合する2つのDataFrameのスキーマが完全に一致していることが必要です。スキーマが異なる場合にはエラーが発生するので注意しましょう。
条件をもとに行を抽出 filter
/where
指定した条件をもとに行を抽出する場合には、以下のようにfilter
またはwhere
を使用します。
# ===== 条件をもとに行を抽出 filter/where # filterで式で抽出 df_filter = df.filter(df.Age > 30) df_filter.show() # filterで文字列条件で抽出 df_filter = df.filter("Age > 30") df_filter.show() # --- whereはfilterのエイリアスとなっている # whereで式で抽出 df_where = df.where(df.Age > 30) df_where.show() # whereで文字列条件で抽出 df_where = df.where("Age > 30") df_where.show()
【実行結果】複数回出力しているがいずれも出力結果は同じ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| +---------+--------+------+---+-----------+------+
抽出されている実行結果はいずれも同じです。まず、filter
とwhere
は同じものになるためどちらを使うかは開発者次第です。実際に、公式ドキュメントにも「where()
is an alias for filter()
.」というように、where
はfilter
の別名(エイリアス)と記載されていますので、いずれを使っても問題ありません。
条件の指定の方法としては、「df.Age > 30
」のように値にアクセスした条件式で指定する方法と「"Age > 30"
」のようにSQLの条件文字列として指定する方法を使用できます。
行の削除 filter
行の削除をしたい場合には、以下のようにfilter
で削除対象の行を除いたDataFrameを作ることで実現します。
# ===== 行の削除 filter # 行を削除 df = df.filter(df.Firstname != "Michael") df.show()
【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 25| Accounting| 2500| +---------+--------+------+---+-----------+------+
上記例では、Firstname
が"Micahel"
でない行のみを抽出している、つまりはFirstname
が"Micahel"
である行を削除しているということになります。
行の並べ替え sort
/orderBy
DataFrameの行を並べ替えたい場合には、以下のようにsort
またはorderBy
を使用します。
# ===== 行の並び替え sort/orderBy # === sortで並び替え print("===== sort") # 昇順 df_sort = df.sort(df.Age) df_sort.show() # 降順 df_sort = df.sort(df.Age.desc()) df_sort.show() # 複数列による並び替え df_sort = df.sort(df.Age, df.Salary.desc()) df_sort.show() # === orderByで並び替え print("===== orderBy") # 昇順 df_orderby = df.orderBy(df.Age) df_orderby.show() # 降順 df_orderby = df.orderBy(df.Age.desc()) df_orderby.show() # 複数条件を指定 df_orderby = df.orderBy(df.Age, df.Salary.desc()) df_orderby.show()
【実行結果】sortもorderByも出力結果は同じ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | James| Smith| M| 30| Sales| 3000| | Michael| Brown| M| 34| Sales| 3500| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Robert|Williams| M| 62| Logistics| 5000| | Anna| Rose| F| 41|Engineering| 4000| | Michael| Brown| M| 34| Sales| 3500| | Mary| Smith| F| 30| Accounting| 2500| | James| Smith| M| 30| Sales| 3000| | Maria| Garcia| F| 20|Engineering| 2000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Maria| Garcia| F| 20|Engineering| 2000| | James| Smith| M| 30| Sales| 3000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
sort
とorderBy
は使い方としては同じです。上記例のように、sort
やorderBy
の引数にdf.Age
といった並び替えのキーとなる列を指定することで並び替えができます。デフォルトは昇順となっています。
降順にしたい場合には、指定する際にdf.Age.desc()
のように降順にするdesc
メソッドを指定します。また、複数列により並び替えを行う場合には、引数に(df.Age, df.Salary.desc())
のように指定することで実現できます。この例では、第1キーとしてAge
列で昇順に並び替え、第2キーとしてSalary
列で降順に並び替えています。
なお、以下のように文字列で列名を指定することも可能です。
from pyspark.sql.functions import col # 昇順 df_sort = df.sort("Age") # 降順 df_sort = df.sort(col("Age").desc()) # 複数列による並び替え df_sort = df.sort("Age", col("Salary").desc())
文字列で指定する場合で、降順に指定する際には、pyspark.sql.functions
からcol
をインポートして指定することでメソッドを使用することが可能です。
行のグループ化 groupBy
行のグループ化を行って集約をするような場合には、以下のようにgroupBy
を使用することができます。
# ===== 行のグループ化 groupBy # グループ化して行をカウント df_groupby_count = df.groupBy("Department").count() df_groupby_count.show() # グループ化して最大値を計算 df_groupby_max = df.groupBy("Department").max() df_groupby_max.show()
【実行結果】 +-----------+-----+ | Department|count| +-----------+-----+ | Sales| 2| |Engineering| 2| | Logistics| 1| | Accounting| 1| +-----------+-----+ +-----------+--------+-----------+ | Department|max(Age)|max(Salary)| +-----------+--------+-----------+ | Sales| 34| 3500| |Engineering| 41| 4000| | Logistics| 62| 5000| | Accounting| 30| 2500| +-----------+--------+-----------+
上記例のようにgroupBy
の引数にグループ化する列名を指定します。その後、count()
やmax()
のような集約関数を適用することで、グループごとの集約結果を得ることが可能です。
なお、groupBy()
の別名(エイリアス)として、groupby()
を使うことも可能です。
行の重複削除 dropDuplicates
重複する行を削除したい場合には、以下のようにdropDuplicates
を使用します。
# 行の重複削除 dropDuplicates # 重複行を追加 new_row = spark.createDataFrame([ ("Michael", "Brown", "M", 34, "Sales", 3500), ("Maria", "Garcia", "F", 20, "Engineering", 2000), ("Mary", "Smith", "F", 30, "Accounting", 2500), ], schema) # データフレームに追加 df_temp = df.union(new_row) df_temp.show() # ==== 重複行を削除 print("===== dropDuplicates") # すべての列が重複するものを削除 df_drop = df_temp.dropDuplicates() df_drop.show() # 指定の列で重複する行を削除 df_drop = df_temp.dropDuplicates(["Lastname"]) df_drop.show() # 複数列で重複する行を削除 df_drop = df_temp.dropDuplicates(["Firstname", "Lastname"]) df_drop.show()
【実行結果】 +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| +---------+--------+------+---+-----------+------+ ===== dropDuplicates +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Michael| Brown| M| 34| Sales| 3500| | Maria| Garcia| F| 20|Engineering| 2000| | Anna| Rose| F| 41|Engineering| 4000| | James| Smith| M| 30| Sales| 3000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+ +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | Anna| Rose| F| 41|Engineering| 4000| | James| Smith| M| 30| Sales| 3000| | Maria| Garcia| F| 20|Engineering| 2000| | Mary| Smith| F| 30| Accounting| 2500| | Michael| Brown| M| 34| Sales| 3500| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
上記例では、あえて重複する行をunion
で追加して、その後dropDuplicates
で重複行を削除しています。引数を指定しないでdropDuplicates
を指定する場合、すべての列が重複するような行が削除されます。
ある特定の列で重複削除したい場合は、["Lastname"]
のように列を指定できます。複数列で重複を確認したい場合は、["Firstname", "Lastname"]
のように列名を列挙することで対応可能です。
なお、dropDuplicates()
の別名(エイリアス)として、drop_duplicates()
を使うことも可能です。
列 (Columns) の基本操作
DataFrameの作成
まずは、列の操作を説明するためのDataFrameを以下のように作成します。以降の列操作は、このDataFrameをベースにして説明します。
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType # SparkSessionの初期化 spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate() # スキーマを定義 schema = StructType([ StructField("Firstname", StringType(), True), StructField("Lastname", StringType(), True), StructField("Gender", StringType(), True), StructField("Age", IntegerType(), True), StructField("Department", StringType(), True), StructField("Salary", IntegerType(), True), ]) # データを作成 data = [ ("James", "Smith", "M", 30, "Sales", 3000), ("Anna", "Rose", "F", 41, "Engineering", 4000), ("Robert", "Williams", "M", 62, "Logistics", 5000), ] # データフレームの作成 df = spark.createDataFrame(data, schema=schema) df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) +---------+--------+------+---+-----------+------+ |Firstname|Lastname|Gender|Age| Department|Salary| +---------+--------+------+---+-----------+------+ | James| Smith| M| 30| Sales| 3000| | Anna| Rose| F| 41|Engineering| 4000| | Robert|Williams| M| 62| Logistics| 5000| +---------+--------+------+---+-----------+------+
DataFrameの作成やスキーマに関する説明は省略します。「DataFrameの作成方法とスキーマ」で詳細を説明していますので、興味があれば参考にしてください。
なお、処理を完了した後にはsparkセッションを終了しますが、以降の説明では省略します。
# SparkSessionを終了 spark.stop()
以降では、上記で作成したデータフレームを使って、列に対する基本操作の方法を説明していきます。
列の追加 withColumn
列を追加したい場合は、以下のようにwithColumn
を使用します。
from pyspark.sql.functions import lit #===== 列の追加 withColumn # 定数列を追加 df = df.withColumn("Country", lit("USA")) df = df.withColumn("Rate", lit(1.1).cast(FloatType())) df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Department: string (nullable = true) |-- Salary: integer (nullable = true) |-- Country: string (nullable = false) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+-------+----+ |Firstname|Lastname|Gender|Age| Department|Salary|Country|Rate| +---------+--------+------+---+-----------+------+-------+----+ | James| Smith| M| 30| Sales| 3000| USA| 1.1| | Anna| Rose| F| 41|Engineering| 4000| USA| 1.1| | Robert|Williams| M| 62| Logistics| 5000| USA| 1.1| +---------+--------+------+---+-----------+------+-------+----+
上記例のようにwithColumn
には、列名と値を渡すことができます。pyspark.sql.functions
からインポートできるlit
を使うと定数値列を簡単に作成できます。lit
はリテラルの意味です。
また、指定した型として作成したい場合は、cast
を使って型指定することで、指定した型で作成可能です。上記例では1.1という数値をFloatType()
として作成しています。
列の名称変更 withColumnRenamed
列の名称変更をしたい場合は、以下のようにwithColumnRenamed
を使用します。
# ===== 列の名称変更 withColumnRenamed df = df.withColumnRenamed("Department", "Dept") df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Country: string (nullable = false) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+-------+----+ |Firstname|Lastname|Gender|Age| Dept|Salary|Country|Rate| +---------+--------+------+---+-----------+------+-------+----+ | James| Smith| M| 30| Sales| 3000| USA| 1.1| | Anna| Rose| F| 41|Engineering| 4000| USA| 1.1| | Robert|Williams| M| 62| Logistics| 5000| USA| 1.1| +---------+--------+------+---+-----------+------+-------+----+
withColumnRenamed
の引数として、変更前の列名と、変更後の列名を指定することで列名を変更できます。上記例では、Department列をDeptという形で省略形に変更しています。
列の削除 drop
列自体を削除したい場合は、以下のようにdrop
を使用します。
# ===== 列の削除 drop df = df.drop("Country") df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Rate: float (nullable = false) +---------+--------+------+---+-----------+------+----+ |Firstname|Lastname|Gender|Age| Dept|Salary|Rate| +---------+--------+------+---+-----------+------+----+ | James| Smith| M| 30| Sales| 3000| 1.1| | Anna| Rose| F| 41|Engineering| 4000| 1.1| | Robert|Williams| M| 62| Logistics| 5000| 1.1| +---------+--------+------+---+-----------+------+----+
drop
で対象列を指定することで列情報を削除することが可能です。上記例では、Country列を削除しています。
列の選択 select
もともとのDataFrameから特定の列だけを選択したい場合には、以下のようにselect
を使用します。
# ===== 列の選択 select df_selected = df.select(["Firstname", "Age"]) df_selected.printSchema() df_selected.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Age: integer (nullable = true) +---------+---+ |Firstname|Age| +---------+---+ | James| 30| | Anna| 41| | Robert| 62| +---------+---+
上記例のようにselect
に抽出したい特定の列を列挙したリストを渡すことで、列選択をすることが可能です。
式の適用 expr
列の操作をする際には、いくつかの列を使って計算をしたくなることがよくあります。このような場合には、以下のようにexpr
を使用すると便利です。
from pyspark.sql.functions import expr # ===== 式の適用 expr df = df.withColumn("Increased Salary", expr("Salary * Rate")) df.printSchema() df.show()
【実行結果】 root |-- Firstname: string (nullable = true) |-- Lastname: string (nullable = true) |-- Gender: string (nullable = true) |-- Age: integer (nullable = true) |-- Dept: string (nullable = true) |-- Salary: integer (nullable = true) |-- Rate: float (nullable = false) |-- Increased Salary: float (nullable = true) +---------+--------+------+---+-----------+------+----+----------------+ |Firstname|Lastname|Gender|Age| Dept|Salary|Rate|Increased Salary| +---------+--------+------+---+-----------+------+----+----------------+ | James| Smith| M| 30| Sales| 3000| 1.1| 3300.0| | Anna| Rose| F| 41|Engineering| 4000| 1.1| 4400.0| | Robert|Williams| M| 62| Logistics| 5000| 1.1| 5500.0| +---------+--------+------+---+-----------+------+----+----------------+
expr
では、文字列で列を用いてSQL等の式を指定できます。上記で紹介したwithColumn
と合わせて使うことで、計算式を適用した列を簡単に作成することが可能です。
まとめ
PySparkにおいて、DataFrameの行や列を操作する方法について解説しました。
PySparkでは、分散処理環境を使ってデータ分析を効率的に実行することが可能です。DataFrameの行や列の操作は、各種分析などを進めていくにあたっての基本となります。今回は、行や列に対する基本的な操作について各種紹介しました。
ぜひ使い方を覚えてもらって効率的なデータ分析に活用してもらいたいと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。