PySpark

【PySpark】DataFrameを結合する方法 join

【PySpark】DataFrameを結合する方法 join
naoki-hn

PySpark で DataFrameを結合する方法について解説します。

PySpark での DataFrame 結合

PySpark は、分散処理フレームワーク Apache Spark の Python 用 API です。PySpark では、DataFrame というデータ構造を使用します。

データを分析する際にはデータソースとして DB、Parquet、CSV などいろいろなデータソースからデータを取得して分析します。この際に、各データソースのデータを同じ意味を表すキー列をもとに結合してから分析することがほとんどです。これにより、より多角的な分析が可能になります。データ結合は、データ活用において非常に重要なデータ操作となります。

PySpark では、DataFrame の結合方法として join メソッドが用意されています。この記事では、join メソッドの使い方の基本を紹介します。

また、Spark は分散処理環境であるため結合アルゴリズムも複数あり、Spark が適切なアルゴリズムを選択します。どのアルゴリズムが選択されるかは Spark が決定しますが、どのような結合アルゴリズムがあるかを理解しておくことはパフォーマンスチューニングに役立ちます。後半では、Spark の結合アルゴリズムの種類についても概要を説明したいと思います。

実行環境は、Docker で構築した Spark 環境の Jupyter Notebook を使用します。環境構築方法は「PySparkの実行環境をDockerで用意する方法」を参考にしてください。

Spark と PySpark 概要や Spark アプリケーションの概念は「Apache SparkとPySparkの概要」や「Sparkアプリケーションの概念を理解する」を参考にしてください。

join を用いた DataFrame の結合

PySpark では、DataFrame の結合方法として join メソッドを使用します。以降では、join メソッドの使い方を例を使って紹介していきます。

使用する DataFrame の作成

結合操作の説明のための簡単な DataFrame を以下のように作成します。

from pyspark.sql import SparkSession

# SparkSessionの初期化
spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate()

data_a = [
    ("A001", 100),
    ("B001", 200),
    ("C001", 300),
    ("C002", 400),
    ("E001", 500),
]
df_a = spark.createDataFrame(data_a, ["id", "value1"])
df_a.printSchema()
df_a.show()

data_b = [
    ("B001", "aaa"),
    ("C001", "bbb"),
    ("D001", "ccc"),
    ("E002", "ddd"),
]
df_b = spark.createDataFrame(data_b, ["id", "value2"])
df_b.printSchema()
df_b.show()
【実行結果】
root
 |-- id: string (nullable = true)
 |-- value1: long (nullable = true)

+----+------+
|  id|value1|
+----+------+
|A001|   100|
|B001|   200|
|C001|   300|
|C002|   400|
|E001|   500|
+----+------+

root
 |-- id: string (nullable = true)
 |-- value2: string (nullable = true)

+----+------+
|  id|value2|
+----+------+
|B001|   aaa|
|C001|   bbb|
|D001|   ccc|
|E002|   ddd|
+----+------+

上記例では、df_adf_bという2つのDataFrameを作成しています。共にidという列を持っているため、id列を使った結合をすることを以降で考えていきます。

なお、処理を完了した後は SparkSession を終了しますが、説明では省略します。

# SparkSessionを終了
spark.stop()

以降では、結合で代表的な内部結合 (Inner Join)、外部結合 (Outer Join) の各方法について上記データを用いた例で説明していきます。

DataFrameの作成に関する説明は省略しています。「DataFrameの作成方法とスキーマ」を参考にしてください。

内部結合 (Inner Join)

内部結合 (Inner Join) は、結合対象テーブルで指定した列で両テーブルの行を比較し、共に指定列の値が一致する行のみを使ってテーブルを結合します。つまり、片方のテーブルに存在しても、もう一方には存在しないデータは結合結果に含まれません。

基本的な使い方

PySpark の DataFrame での内部結合 (Inner Join) は以下のようにします。

# 内部結合 (inner join)
df_inner = df_a.join(df_b, on=["id"], how="inner")
df_inner.show()
【実行結果】
+----+------+------+
|  id|value1|value2|
+----+------+------+
|B001|   200|   aaa|
|C001|   300|   bbb|
+----+------+------+

例では、df_a に対して df_b のテーブルを join メソッドで結合しています。

結合条件に指定する列は on 引数で指定します。例では、id 列を指定しています。また、結合方法は how 引数で指定します。内部結合のため how="inner" とします。

結果を見てみると df_adf_b でともに現れる id の行のみが抽出されて結合されていることが分かります。

外部結合 (Outer Join)

外部結合 (Outer Join) は、結合対象テーブルで指定した列において両テーブルの行を比較したときに、一致しない行も結合結果に含めて結合します。この時、片方のテーブルに存在し、もう一方には存在しないデータの場合には NULL で埋められます。

外部結合 (Outer Join)

PySpark の DataFrame での外部結合 (Outer Join) は以下のようにします。

# 外部結合 (outer join)
df_outer = df_a.join(df_b, on=["id"], how="outer")
df_outer.show()
【実行結果】
+----+------+------+
|  id|value1|value2|
+----+------+------+
|A001|   100|  NULL|
|B001|   200|   aaa|
|C001|   300|   bbb|
|C002|   400|  NULL|
|D001|  NULL|   ccc|
|E001|   500|  NULL|
|E002|  NULL|   ddd|
+----+------+------+

外部結合を使用する場合は、 how 引数で how="outer" を指定します。

結果を見てみると df_adf_b において id が一致していない行についても含めて結合されていることが分かります。例のように片方のテーブルには存在し、もう一方には存在しないデータは NULL で埋められます。

左外部結合 (Left Outer Join)

左外部結合(Left Outer Join) は、外部結合のうち、左側のテーブルを基準にして結合します。具体的には、左側のテーブルのすべての行を含み、指定した列において一致する行が右側のテーブルにない場合は NULLで埋めます。Left Join と単純に言う場合は左外部結合のことを指していると思ってください。

PySpark の DataFrame で左外部結合 (Left Outer Join) は以下のようにします。

# 左外部結合 (left Outer join)
df_left_outer = df_a.join(df_b, on=["id"], how="left")
df_left_outer.show()
【実行結果】
+----+------+------+
|  id|value1|value2|
+----+------+------+
|A001|   100|  NULL|
|B001|   200|   aaa|
|C001|   300|   bbb|
|C002|   400|  NULL|
|E001|   500|  NULL|
+----+------+------+

左外部結合を使用する場合は、how 引数で how="left" を指定します。

ここでいう左とは df_a のことです。結果を見ると df_a のすべて行を含み df_b を結合しています。左側のテーブルである df_a に存在し、もう一方の df_b には存在しないデータは NULL で埋められます。

右外部結合 (Right Outer Join)

右外部結合 (Right Outer Join) は、外部結合のうち、右側のテーブルを基準にして結合します。具体的には、右側のテーブルのすべての行を含み、指定した列において一致する行が左側のテーブルにない場合は NULL で埋めます。Right Join と単純に言う場合は右外部結合のことを指していると思ってください。

結合するテーブル順を逆にすれば、右外部結合は左外部結合でも表現できます。実際の使用においては左外部結合がより一般的です。これは、左から右へという自然な流れにあっているからです。とはいえ、右外部結合の使い方も知っておきましょう。

PySpark の DataFrame で右外部結合 (Right Outer Join) は以下のようにします。

# 右外部結合 (Right Outer Join)
df_right_outer = df_a.join(df_b, on=["id"], how="right")
df_right_outer.show()
【実行結果】
+----+------+------+
|  id|value1|value2|
+----+------+------+
|B001|   200|   aaa|
|C001|   300|   bbb|
|D001|  NULL|   ccc|
|E002|  NULL|   ddd|
+----+------+------+

右外部結合を使用する場合は、how 引数で how="right" を指定します。

ここでいう右とは df_b のことです。結果を見ると df_b のすべての行を含み df_a を結合しています。右側のテーブルである df_b に存在し、もう一方の df_a には存在しないデータは NULL で埋められます。

複数列をキーに使用した結合

DataFrame を結合する場合には、複数列をキーにして結合したくなることがほとんどです。複数キーでの結合するために df_c という DataFrame を作っておきます。

data_c = [
    ("A001", 1, "num1"),
    ("B001", 200, "num2"),
    ("C001", 300, "num3"),
    ("C001", 4, "num4"),
    ("E001", 500, "num5"),
]
df_c = spark.createDataFrame(data_c, ["id", "value1", "value3"])
df_c.printSchema()
df_c.show()
【実行結果】
root
 |-- id: string (nullable = true)
 |-- value1: long (nullable = true)
 |-- value3: string (nullable = true)

+----+------+------+
|  id|value1|value3|
+----+------+------+
|A001|     1|  num1|
|B001|   200|  num2|
|C001|   300|  num3|
|C001|     4|  num4|
|E001|   500|  num5|
+----+------+------+

id 列と value1 列は、df_a と共通したものとなっています。このデータを用いて df_adf_cidvalue1 をキーに結合してみます。

# 複数条件での結合
df_inner_multi = df_a.join(df_c, on=["id", "value1"], how="inner")
df_inner_multi.show()
【実行結果】
+----+------+------+
|  id|value1|value3|
+----+------+------+
|B001|   200|  num2|
|C001|   300|  num3|
|E001|   500|  num5|
+----+------+------+

使い方はこれまで見てきた例とほとんど同じで on 引数に ["id", "value1"] といった形で複数のキーとなる列をリストで指定します。

結果を見ると idvalue1 がともに一致する列のみ抽出されています。

異なる列名で結合する場合

複数データソースからデータを取得して分析をする場合、同じ意味でも列名が異なるケースはよくあります。このような場合の対処法についても見ておきましょう。

列名が異なる例として df_d という DataFrame を作っておきます。このデータは df_b と内容は同じなのですが id に該当する列名が no となっています。

data_d = [
    ("B001", "aaa"),
    ("C001", "bbb"),
    ("D001", "ccc"),
    ("E002", "ddd"),
]
df_d = spark.createDataFrame(data_d, ["no", "value5"])
df_d.printSchema()
df_d.show()
【実行結果】
root
 |-- no: string (nullable = true)
 |-- value5: string (nullable = true)

+----+------+
|  no|value5|
+----+------+
|B001|   aaa|
|C001|   bbb|
|D001|   ccc|
|E002|   ddd|
+----+------+

これまで使用していた df_a と上記の df_d を結合する例を見てみましょう。

等価演算子 (==) を使用して結合する

異なる列名で結合する場合に、等価演算子 (==) を使用して結合する方法があります。等価演算子を使用して異なる列名を条件に結合する場合には以下のようにします。

# 等価演算子(==)を使用して異なる列名を条件に結合する
df_inner_equal = df_a.join(df_d, df_a.id == df_d.no, how="inner")
df_inner_equal.show()
【実行結果】
+----+------+----+------+
|  id|value1|  no|value5|
+----+------+----+------+
|B001|   200|B001|   aaa|
|C001|   300|C001|   bbb|
+----+------+----+------+

等価演算子 (==) を使用する場合には、例のように「df_a.id == df_d.no」といった条件を引数に指定します。

結果を見ると条件に一致した行のみ抽出されていることが分かるかと思います。ただし、結合結果にはそれぞれの列 (idno) が含まれる結果となります。

列名を統一してから結合する

上記のように等価演算子 (==) を使用することで異なる列名でも結合ができました。しかし、同じ意味の異なる列が結合結果に残ってしまいます。

それぞれの列を残しておく必要がない場合には、列名を統一してから結合します。

# 列名を変更して統一する
df_d = df_d.withColumnRenamed("no", "id")
df_d.show()

# 統一した列名を用いて結合する
df_inner_renamed = df_a.join(df_d, on=["id"], how="inner")
df_inner_renamed.show()
【実行結果】
+----+------+
|  id|value5|
+----+------+
|B001|   aaa|
|C001|   bbb|
|D001|   ccc|
|E002|   ddd|
+----+------+

+----+------+------+
|  id|value1|value5|
+----+------+------+
|B001|   200|   aaa|
|C001|   300|   bbb|
+----+------+------+

列名を統一する場合には、withColumnRenamed を使用して列名を変更します。例では、df_dnoid に変更しています。その後、id 列をキーにして結合することで異なる列名に対する結合ができます。

等価演算子 (==) を使用して結合するか、列名を統一してから結合するかは、結合によって求められる状況によって異なります。各状況に応じでどちらで対応するかを十分に検討してください。

結合アルゴリズムの種類

Spark の結合の join では、結合アルゴリズムをデータセットのサイズ、結合キーの有無、クエリの要件などに基づいて最適なものが選択します。主な結合のアルゴリズムは以下のようなものがあります。

  1. Broadcast Hash Join
  2. Sort Merge Join
  3. Shuffle Hash Join
  4. Broadcast Nested Loop Join
  5. Cartesian Join

一般的には、上から順に高速な処理と言われます。ただし、実際の性能はケースによるためご注意ください。以降で、各アルゴリズムの概要について説明します。

Broadcast Hash Join

Broadcast Hash Join (ブロードキャストハッシュジョイン)は、片方のデータセットが十分に小さい場合に使用されます。

Spark は、分散されたノードで処理が実行されます。そのため、例えばマスタテーブルなど小さいデータセットがある場合には、各ノードに先にブロードキャストで配布し、その後で大きなデータセットとハッシュテーブルを使用して結合することで効率的にデータを結合します。

主に小さなデータセットと大きなデータセットの結合で使用され、処理は高速です。

Sort Merge Join

Sort Merge Join (ソートマージジョイン)は、両方のデータセットが大きい場合に使用されます。

このアルゴリズムは、結合キーに基づいてデータがソート、シャッフルされた後、ソートされたデータを使用してマージ結合が行われます。両方のデータセットが大きく、データの分散が一様でない場合の結合で使用され、効率的に処理がされます。

Shuffle Hash Join

Shuffle Hash Join (シャッフルハッシュジョイン)は、両方のデータセットが中程度のサイズの場合に使用されることが多いアルゴリズムです。

このアルゴリズムでは、結合キーに基づいてデータがシャッフルされ、その後ハッシュベースの結合が行われます。中程度のデータセットには適していますが Broadcast Hash Join や Sort Merge Join よりは一般的に少し遅くなります。

Broadcast Nested Loop Join

Broadcast Nested Loop Join (ブロードキャストネステッドループジョイン)は、結合キーがない場合等に使用されます。

このアルゴリズムでは、小さいデータセットが各ノードにブロードキャストされて大きなデータセットのデータに対してループによる結合が行われます。結合キーがない場合といった通り、組み合わせで結合するため通常避けられるべき結合の種類です。

上記の join での結合例で紹介したように結合キーを指定して結合する場合には、通常選択されることはありません。

Cartesian Join

Cartesian Join (カルテシアンジョイン)は、結合キーが指定されていない場合で、データセット間のカルテシアン積(すべての可能なペアの結合)を生成する場合に使用されます。

このアルゴリズムは、リソースを大量に消費し非常に遅いプロセスであるため、通常は避けられるべき結合の種類であり、特定の分析や複雑なクエリなどで他の結合方法が適用できな場合に限り使われるものです。

上記の join での結合例で紹介したように結合キーを指定して結合する場合には、通常選択されることはありません。

チューニングの際に考慮するべきこと

Spark では、上記のような結合アルゴリズムをデータセットのサイズ、結合キーの有無、クエリの要件などに基づいて最適なものが選択されます。

実行計画を確認する explain メソッドや実行結果を確認する Spark UI を用いると、どの結合アルゴリズムが選択されているかを確認することができます。もし遅いとされる Broadcast Nested Loop Join や Cartesian Join が選択されているような場合には、repartition 等でパーティションサイズを調整したり、クエリを変更するなどを再検討するのが良いでしょう。

また、join の際に特定のアルゴリズムを使用するようにヒントを与える hint メソッドというものもあります。以下は、Broadcast Hash Join を使用するようにヒントを与える例です。

df = df1.join(df2.hint("broadcast"), on=["Key1"])

上記例では df2 が小さいデータセットの場合に df2 をブロードキャストしてから結合するように促すことができます。

また、同様に Sort Merge Join を使用するように促したい場合には、以下のようにすることもできます。

df = df1.join(df2.hint("merge"), on=["Key1"])

なお、注意事項として上記はあくまでヒントであるため必ずそのアルゴリズムが選択されるわけではないことに注意してください。

どのアルゴリズムが選択されるかは、最終的にはデータセットのサイズや結合キーの有無、クエリの要件によって Spark が自動的に判定して最適なものを選択します。

まとめ

PySpark で DataFrame を結合する方法について解説しました。

PySpark では、DataFrameの結合方法として join メソッドが用意されています。この記事では join メソッドの使い方の基本を紹介しました。

また、Spark では分散環境で動作するため、内部的に効率的な結合アルゴリズムが選択されます。Spark の結合アルゴリズムの種類についても簡単に説明しました。

パフォーマンスチューニングの際には、どの結合アルゴリズムが選択されているかを確認して必要に応じて repartition 等でパーティションサイズを調整したり、クエリを変更するなどの検討が必要になります。

データ結合は、データ活用において非常に重要なデータ操作です。ぜひ、PySpark での結合操作をうまく使いこなしてもらいたいと思います。

ソースコード

上記で紹介しているソースコードについては GitHub にて公開しています。参考にしていただければと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド
ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました