PySparkでDataFrameを結合する方法について解説します。
Contents
PySparkでのDataFrameの結合
PySparkのDataFrameは、Sparkを使用してデータ処理を行うためのデータ構造です。
データを分析する際にはデータソースとして、DB、parquet、csvなどいろいろなデータソースからデータを取得して分析しますが、各データを同じ意味を表すキー列をもとに結合してから分析することがほとんどです。これにより、より多角的な分析が可能になります。データの結合は、データ活用において非常に重要なデータ操作と言えます。
PySparkでは、DataFrameの結合方法としてjoin
メソッドが用意されています。本記事では、join
メソッドの使い方の基本を紹介します。
また、Sparkは分散処理環境であるため結合アルゴリズムにも複数あり、Sparkが適切なアルゴリズムを選択します。どのアルゴリズムが選択されるかはSparkが決定しますが、どのような結合アルゴリズムがあるかを理解しておくことはパフォーマンスチューニングに役立ちます。本記事の後半では、Sparkの結合アルゴリズムの種類についても説明したいと思います。
なお、実行環境としては「PySparkの実行環境をDockerで用意する方法」で説明しているDockerでのJupyter Notebook実行環境を使用しています。
joinを用いたDataFrameの結合
PySparkでは、DataFrameの結合方法としてjoin
メソッドを使用します。以降では、join
メソッドの使い方を例を使って紹介していきます。
使用するDataFrameの作成
まずは、結合操作の説明のための簡単なDataFrameを以下のように作成します。
from pyspark.sql import SparkSession # SparkSessionの初期化 spark = SparkSession.builder.appName("ColumnsOperation").getOrCreate() data_a = [ ("A001", 100), ("B001", 200), ("C001", 300), ("C002", 400), ("E001", 500), ] df_a = spark.createDataFrame(data_a, ["id", "value1"]) df_a.printSchema() df_a.show() data_b = [ ("B001", "aaa"), ("C001", "bbb"), ("D001", "ccc"), ("E002", "ddd"), ] df_b = spark.createDataFrame(data_b, ["id", "value2"]) df_b.printSchema() df_b.show()
【実行結果】 root |-- id: string (nullable = true) |-- value1: long (nullable = true) +----+------+ | id|value1| +----+------+ |A001| 100| |B001| 200| |C001| 300| |C002| 400| |E001| 500| +----+------+ root |-- id: string (nullable = true) |-- value2: string (nullable = true) +----+------+ | id|value2| +----+------+ |B001| aaa| |C001| bbb| |D001| ccc| |E002| ddd| +----+------+
DataFrameの作成に関する説明は省略します。「DataFrameの作成方法とスキーマ」で詳細を説明していますので、興味があれば参考にしてください。
上記例では、df_a
とdf_b
という2つのDataFrameを作成しています。共にid
という列を持っているため、id
列を使った結合をすることを以降で考えていきます。
なお、処理を完了した後にはsparkセッションを終了しますが、以降の説明では省略します。Sparkセッションの終了は重要であり、不要になったセッションはリソースを解放するために適切に終了させるべきであることを覚えておいてください。
# SparkSessionを終了 spark.stop()
以降では、結合で代表的な内部結合(Inner Join)、外部結合(Outer Join)の結合方法について上記データを用いた例で説明していきます。
内部結合 (Inner Join)
内部結合(Inner Join)は、結合対象テーブルで指定した列において両テーブルの行を比較し、共に指定列の値が一致する行のみを使ってテーブルを結合します。つまり、一方のテーブルには存在しても、もう一方には存在しないデータは結合結果に含まれません。
基本的な使い方
PySparkのDataFrameで、内部結合(Inner Join)をする場合は以下のようにします。
# 内部結合 (inner join) df_inner = df_a.join(df_b, on=["id"], how="inner") df_inner.show()
【実行結果】 +----+------+------+ | id|value1|value2| +----+------+------+ |B001| 200| aaa| |C001| 300| bbb| +----+------+------+
上記例では、df_a
に対してdf_b
のテーブルをjoin
メソッドで結合しています。
結合条件に指定する列は、on
引数で指定します。この例では、id
列を指定しています。また、結合の方法はhow
引数で指定します。今回は内部結合のため、how="inner"
としています。
結果を見てみると、df_a
とdf_b
でともに現れるid
の行のみが抽出されて結合されていることが分かるかと思います。
外部結合 (Outer Join)
外部結合(Outer Join)は、結合対象テーブルで指定した列において両テーブルの行を比較したときに、一致しない行も結合結果に含めて結合します。この時、一方のテーブルに存在し、もう一方には存在しないデータの場合には、NULL
で埋められます。
外部結合 (Outer Join)
PySparkのDataFrameで、外部結合(Outer Join)をする場合は以下のようにします。
# 外部結合 (outer join) df_outer = df_a.join(df_b, on=["id"], how="outer") df_outer.show()
【実行結果】 +----+------+------+ | id|value1|value2| +----+------+------+ |A001| 100| NULL| |B001| 200| aaa| |C001| 300| bbb| |C002| 400| NULL| |D001| NULL| ccc| |E001| 500| NULL| |E002| NULL| ddd| +----+------+------+
外部結合を使用する場合は、how
引数でhow="outer"
を指定します。
結果を見てみると、df_a
とdf_b
において、id
が一致していない行についても含めて結合されていることが分かります。なお、例のように一方のテーブルには存在し、もう一方には存在しないデータはNULL
で埋められます。
左外部結合 (Left Outer Join)
左外部結合(Left Outer Join)は、外部結合のうち、左側のテーブルを基準にして結合します。具体的には、左側のテーブルのすべての行を含み、指定した列において一致する行が右側のテーブルにない場合は、NULL
で埋めます。Left Joinと単純に言う場合も、左外部結合のことを指していると思ってください。
PySparkのDataFrameで、左外部結合(Left Outer Join)をする場合は以下のようにします。
# 左外部結合 (left Outer join) df_left_outer = df_a.join(df_b, on=["id"], how="left") df_left_outer.show()
【実行結果】 +----+------+------+ | id|value1|value2| +----+------+------+ |A001| 100| NULL| |B001| 200| aaa| |C001| 300| bbb| |C002| 400| NULL| |E001| 500| NULL| +----+------+------+
左外部結合を使用する場合は、how
引数でhow="left"
を指定します。
ここでいう左とはdf_a
のことです。結果を見ても分かる通り、df_a
のすべて行を含みdf_b
を結合しています。左側のテーブルであるdf_a
に存在し、もう一方のdf_b
には存在しないデータはNULL
で埋められます。
右外部結合 (Right Outer Join)
右外部結合(Right Outer Join)は、外部結合のうち、右側のテーブルを基準にして結合します。具体的には、右側のテーブルのすべての行を含み、指定した列において一致する行が左側のテーブルにない場合は、NULL
で埋めます。Right Joinと単純に言う場合も、右外部結合のことを指していると思ってください。
お気づきかと思いますが、結合するテーブル順を逆にすれば、右外部結合は左外部結合で表現できます。実際の使用においては左外部結合がより一般的です。これは、左から右へという自然な流れにあっているからです。とはいえ、右外部結合の使い方も見ておきましょう。
PySparkのDataFrameで、右外部結合(Right Outer Join)をする場合は以下のようにします。
# 右外部結合 (Right Outer Join) df_right_outer = df_a.join(df_b, on=["id"], how="right") df_right_outer.show()
【実行結果】 +----+------+------+ | id|value1|value2| +----+------+------+ |B001| 200| aaa| |C001| 300| bbb| |D001| NULL| ccc| |E002| NULL| ddd| +----+------+------+
右外部結合を使用する場合は、how
引数でhow="right"
を指定します。
ここでいう右とはdf_bのことです。結果を見ても分かる通り、df_b
のすべての行を含みdf_a
を結合しています。右側のテーブルであるdf_b
に存在し、もう一方のdf_a
には存在しないデータはNULL
で埋められます。
複数列をキーに使用した結合
DataFrameを結合する場合には、複数の列をキーにして結合したくなることがほとんどです。複数列をキーに使用した結合の例も見ておきましょう。
複数キーでの結合を試すために以下のようなdf_c
というDataFrameを作ります。
data_c = [ ("A001", 1, "num1"), ("B001", 200, "num2"), ("C001", 300, "num3"), ("C001", 4, "num4"), ("E001", 500, "num5"), ] df_c = spark.createDataFrame(data_c, ["id", "value1", "value3"]) df_c.printSchema() df_c.show()
【実行結果】 root |-- id: string (nullable = true) |-- value1: long (nullable = true) |-- value3: string (nullable = true) +----+------+------+ | id|value1|value3| +----+------+------+ |A001| 1| num1| |B001| 200| num2| |C001| 300| num3| |C001| 4| num4| |E001| 500| num5| +----+------+------+
id
列とvalue1
列は、df_a
と共通したものとなっています。このデータを用いて、df_a
とdf_c
をid
, value1
をキーに結合するには以下のようにします。
# 複数条件での結合 df_inner_multi = df_a.join(df_c, on=["id", "value1"], how="inner") df_inner_multi.show()
【実行結果】 +----+------+------+ | id|value1|value3| +----+------+------+ |B001| 200| num2| |C001| 300| num3| |E001| 500| num5| +----+------+------+
使い方はこれまで見てきた例とほとんど同じで、on
引数に["id", "value1"]
といった形で複数のキーとなる列をリストで指定します。
結果を見てみても、id
とvalue1
がともに一致する列のみ抽出されていることが分かるかと思います。
異なる列名で結合する場合
複数のデータソースからデータを取得して分析をする場合、同じ意味であるにもかかわらず列名が異なっているようなケースはよくあります。このような場合の対処法についても見ておきましょう。
列名が異なっているような例として、以下のようなdf_d
というDataFrameを作ります。このデータはdf_b
と内容は同じなのですが、id
に該当する列名がno
となっています。
data_d = [ ("B001", "aaa"), ("C001", "bbb"), ("D001", "ccc"), ("E002", "ddd"), ] df_d = spark.createDataFrame(data_d, ["no", "value5"]) df_d.printSchema() df_d.show()
【実行結果】 root |-- no: string (nullable = true) |-- value5: string (nullable = true) +----+------+ | no|value5| +----+------+ |B001| aaa| |C001| bbb| |D001| ccc| |E002| ddd| +----+------+
これまで使用していたdf_a
と上記のdf_d
を結合する例を見てみましょう。
等価演算子(==
)を使用して結合する
異なる列名で結合をする場合に、等価演算子(==
)を使用して結合する方法があります。等価演算子を使用して異なる列名を条件に結合する場合には以下のようにします。
# 等価演算子(==)を使用して異なる列名を条件に結合する df_inner_equal = df_a.join(df_d, df_a.id == df_d.no, how="inner") df_inner_equal.show()
【実行結果】 +----+------+----+------+ | id|value1| no|value5| +----+------+----+------+ |B001| 200|B001| aaa| |C001| 300|C001| bbb| +----+------+----+------+
等価演算子(==
)を使用する場合には、上記例のように「df_a.id == df_d.no
」といった条件を引数に指定します。
結果を見てみると分かりますが条件に一致した行のみ抽出されていることが分かるかと思います。ただし、結合結果にはそれぞれの列(id
とno
)が含まれる結果となります。
列名を統一してから結合する
上記のように等価演算子(==
)を使用することで異なる列名でも結合ができました。しかし、同じ意味の異なる列が結合結果に残ってしまいます。
もし、それぞれの列を残しておく意味がない場合には、以下のように列名を統一してから結合するのがよいでしょう。
# 列名を変更して統一する df_d = df_d.withColumnRenamed("no", "id") df_d.show() # 統一した列名を用いて結合する df_inner_renamed = df_a.join(df_d, on=["id"], how="inner") df_inner_renamed.show()
【実行結果】 +----+------+ | id|value5| +----+------+ |B001| aaa| |C001| bbb| |D001| ccc| |E002| ddd| +----+------+ +----+------+------+ | id|value1|value5| +----+------+------+ |B001| 200| aaa| |C001| 300| bbb| +----+------+------+
列名を統一する場合には、withColumnRenamed
を使用して列名を変更します。上記の例では、df_d
のno
をid
に変更しています。その後、id
列をキーにして結合することで異なる列名に対する結合ができます。
等価演算子(==
)を使用して結合するか、列名を統一してから結合するかは、結合によって求められる状況によって異なります。各状況に応じでどちらで対応するかを検討すると良いでしょう。
結合アルゴリズムの種類
Sparkの結合のjoin
では、結合アルゴリズムをデータセットのサイズ、結合キーの有無、クエリの要件などに基づいて最適なものが選択されます。主な結合のアルゴリズムは以下のようなものがあります。
- Broadcast Hash Join
- Sort Merge Join
- Shuffle Hash Join
- Broadcast Nested Loop Join
- Cartesian Join
一般的に上から順に高速な処理となっています。以降で、各アルゴリズムの概要について説明します。
Broadcast Hash Join
Broadcast Hash Join (ブロードキャストハッシュジョイン)は、一方のデータセットが十分に小さい場合に使用されます。
Sparkは、分散されたノードで処理が実行されます。そのため、例えばマスタテーブルなど小さいデータセットがある場合には、各ノードに先にブロードキャストで配布し、その後で大きなデータセットとハッシュテーブルを使用して結合することで効率的にデータを結合できます。
主に小さなデータセットと大きなデータセットの結合で使用され、処理は高速です。
Sort Merge Join
Sort Merge Join (ソートマージジョイン)は、両方のデータセットが大きい場合に使用されます。
このアルゴリズムは、結合キーに基づいてデータがソート、シャッフルされた後、ソートされたデータを使用してマージ結合が行われます。両方のデータセットが大きく、データの分散が一様でない場合の結合で使用され、効率的に処理がされます。
Shuffle Hash Join
Shuffle Hash Join (シャッフルハッシュジョイン)は、両方のデータセットが中程度のサイズの場合に使用されることが多いアルゴリズムです。
このアルゴリズムでは、結合キーに基づいてデータがシャッフルされ、その後ハッシュベースの結合が行われます。中程度のデータセットには適していますが、Broadcast Hash JoinやSort Merge Joinよりは一般的に少し遅くなります。
Broadcast Nested Loop Join
Broadcast Nested Loop Join (ブロードキャストネステッドループジョイン)は、結合キーがない場合等に使用されます。
このアルゴリズムでは、小さいデータセットが各ノードにブロードキャストされて大きなデータセットのデータに対してループによる結合が行われます。結合キーがない場合等といった通り、組み合わせで結合がされるため通常は避けられるべき結合の種類になります。
上記のjoin
での結合例で紹介したように結合キーを指定して結合する場合には、通常選択されることはありません。
Cartesian Join
Cartesian Join (カルテシアンジョイン)は、結合キーが指定されていない場合で、データセット間のカルテシアン積(すべての可能なペアの結合)を生成する場合に使用されます。
このアルゴリズムは、リソースを大量に消費し非常に遅いプロセスであるため、通常は避けられるべき結合の種類になります。特定の分析や複雑なクエリなどで他の結合方法が適用できな場合に限り使われるものです。
上記のjoin
での結合例で紹介したように結合キーを指定して結合する場合には、通常選択されることはありません。
チューニングの際に考慮するべきこと
Sparkでは、上記のような結合アルゴリズムをデータセットのサイズ、結合キーの有無、クエリの要件などに基づいて最適なものが選択されます。
実行計画を確認するexplain
メソッドや実行結果を確認するSpark UIを用いると、どの結合アルゴリズムが選択されているかを確認することができます。もし遅いとされるBroadcast Nested Loop JoinやCartesian Joinが選択されているような場合には、repartition
等でパーティションサイズを調整したり、クエリを変更するなどを再検討するのが良いでしょう。
また、join
の際に特定のアルゴリズムを使用するようにヒントを与えるhint
メソッドというものもあります。以下は、Broadcast Hash Joinを使用するようにヒントを与える例です。
df = df1.join(df2.hint("broadcast"), on=["Key1"])
上記例では、df2
が小さいデータセットの場合に、df2
をブロードキャストしてから結合するように促すことができます。
また、同様にSort Merge Joinを使用するように促したい場合には、以下のようにすることもできます。
df = df1.join(df2.hint("merge"), on=["Key1"])
なお、注意事項として上記はあくまでヒントであるため、必ずそのアルゴリズムが選択されるわけではないことに理解しておいてください。
どのアルゴリズムが選択されるかは、最終的にはデータセットのサイズや結合キーの有無、クエリの要件によってSparkが自動的に判定して最適なものを選択します。
まとめ
PySparkでDataFrameを結合する方法について解説しました。
PySparkでは、DataFrameの結合方法としてjoin
メソッドが用意されています。本記事では、joinメソッドの使い方の基本を紹介しました。
また、Sparkでは分散環境で動作するため、内部的に効率的な結合アルゴリズムが選択されます。Sparkの結合アルゴリズムの種類についても簡単に説明しました。
パフォーマンスチューニングの際には、どの結合アルゴリズムが選択されているかを確認して必要に応じてrepartition
等でパーティションサイズを調整したり、クエリを変更するなどを再検討するのが良いでしょう。
データ結合は、データ活用において非常に重要なデータ操作です。ぜひ、PySparkでの結合操作について使いこなしてもらいたいと思います。
上記で紹介しているソースコードについてはgithubにて公開しています。参考にしていただければと思います。