PySpark

Sparkアプリケーションの概念を理解する

Sparkアプリケーションの概念を理解する

Sparkの動作を理解するために、Sparkアプリケーションの概念について解説します。

Sparkアプリケーションの概念を理解する

Apache Sparkは、大規模なデータを処理するためのオープンソースの分散処理フレームワークです。Sparkの概要については「Apache SparkとPySparkの概要」でまとめていますので興味があれば参考にしてください。

例えば、Pythonの場合には、PySparkといったAPIを使うことでSparkを簡単に使うことができます。Sparkを使うことで分散処理を自動で実行してくれるため、プログラマの負担は減ります。ただし、Sparkにある概念を理解せず使用していると、実は非常に非効率なコードになっていたり、explainメソッドによる実行計画やSpark UIによるチューニングをしようと思っても何をすればよいか見当もつかなくなってしまいます。

そこで、本記事では、Sparkアプリケーションの主要な概念を理解することで、実装能力の向上やチューニングの際の手がかりとなるような情報を提供することを目的としたいと思います。

具体的には以下のようなトピックスを扱います。

  1. アプリケーション構成と実行ノード
  2. Sparkのデータ構造
  3. Sparkの処理概念

以降では、それぞれの項目について詳細を説明していきます。

アプリケーション構成と実行ノード

Sparkアプリケーションを理解するためには、構成を理解することが重要です。Sparkアプリケーションの構成について説明していきます。

PySpark等のAPIを用いて作成されたユーザープログラムを、Spark Applicationといいます。このSpark Applicationは、Spark Driverがあり処理を実行します。Driverプログラムは、Sparkクラスター上のあるノードで実行されます。また、Spark ExecutorはSparkクラスター上の処理ノードです。Executorは処理を実行する分散ノードで、それぞれに実行のためのコアを複数持ちます。

Spark Applicationの構成

Sparkを実行する際に重要になるのは、Spark Sessionです。Spark Sessionは、Sparkを操作するための入り口のインターフェースで、Spark Applicationでは開発者がSpark Sessionを生成します。

例えば、PySparkでセッションを作成する際には以下のように生成します。

from pyspark.sql import SparkSession

# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()

上記は、”Example”というSpark Applicationのセッションを作成しています。

Driverプログラムは、処理する内容を1つまたは複数のJobに変換します。これらの各JobはDAG (Directed Acyclic Graph)に変換されます。DAGは、有向非循環グラフのことです。Spark は、データ処理のタスクを DAG としてモデル化することで、最適化された実行計画を作成し、処理の高い効率性を実現しています。

Jobはさらに複数のStageで構成されます。これは、連続または並列で実行できる操作に基づいて作成されます。Stageが変わる際には、ノード間でデータのシャッフルによるデータ転送が発生することがあります。

さらに、各Stageは複数のTaskから構成されます。各Taskは、Executorにおける単一のコアにマッピングされ、データの単一のパーティションに対して処理します。パーティションの考え方については後述します。

Job, Stage, Taskの関係性

Sparkのデータ構造

Sparkでは、処理対象の大規模データを扱うためのデータ構造が用意されています。ここでは、Sparkのデータ構造として理解しておくべき「RDD」「DataFrame」「Dataset」とパーティションの考え方について紹介します。

RDD (Resilient Distributed Dataset)

Sparkでは、主要なデータ構造として、RDD (Resilient Distributed Dataset)というものがあります。

RDDは、不変性(Immutability)を持っているデータ構造で、一度作成すると変更することができません。データの加工などで変更が発生する場合は、新しいRDDが生成されます。また、RDDはクラスタ内の複数ノードにまたがって分散して保持され、Executor内の複数のタスクがRDDの各パーティションを処理します。

また、メモリやディスクにキャッシュや永続化することもできるため、同じRDDに対する複数回操作の計算時間を大幅に短縮できる特徴もあります。

RDDは、元のデータとの間に適用された変換のシーケンスを保持しているため、データが欠損しても元データから再計算により復元する能力を持っています。RDDは、Resilient(復元力のある)、分散された(Distributed)、データセット(Dataset)という名の通り、耐障害性を備えたデータ構造です。

DataFrame

Sparkのデータ構造として、RDDを紹介しましたが、RDDの上に構築された高レベルのAPIとしてDataFrameがあります。

Pythonの分析ライブラリであるpandasのDataFrameに似ていますが、SparkのDataFrameは分散処理能力を備えているものであり似て非なるものであることは理解しておいていただきたいと思います。

SparkのDataFrameからpandasのDataFrameへ変換する方法も提供されていますが、大量のデータを変換する場合、特定のノードのメモリ上でデータが集約される可能性があるので注意が必要です。

DataFrameとRDDのどちらのAPIを使うかという選択がありますが、DataFrameは、Sparkの最適化エンジンであるCatalystにより自動的にクエリ最適化が行われる点や、多くのユーザーにとって使いやすいとされるAPIが提供されているため、基本的にはDataFrameの使用が推奨されます。

ただし、特定の要件で低レベルの操作や細かな制御が必要な場合は、RDDを直接使用することも考慮が必要になります。

Dataset

DataFrameとは別のSparkのデータ構造として、Datasetが提供されています。Datasetは、型安全を提供する点でDataFrameと区別されます。具体的には、DataFrameは、動的型付けを採用しているのに対して、Datasetでは静的型付けとコンパイル時の型検査をサポートしています。

Spark2.0以降で、DataFrameは、Dataset[Row](Row型のDataset)として内部的に表現されています。つまり、DataFrameとDatasetは、同じAPIを使用できます。

Datasetは、静的型付け言語のScalaやJavaで使用可能ですが、動的型付けの言語であるPythonやRではAPIが提供されていません。ScalaやJavaでは、DataFrameも使用できますが、型安全の特徴も考慮してDatasetを使うことが推奨されます。

以上がRDD、DataFrame、Datasetが、Sparkの主要なデータ構造です。使用するデータ構造は、プロジェクトの要件や使用するプログラム言語によって異なるため、それぞれの特性を理解し、適切に選択することが重要です。

パーティション

Apache SparkのApplication構成や処理の概要を見てきました。Sparkの処理の動きを正確に把握するためには、データがどのように分割されて処理されるかが重要になります。その際にポイントとなるのがパーティションという考え方です。ここでは、パーティションの考え方を整理します。

RDD、DataFrame、Datasetというデータ構造では、Spark上での処理の際に、複数のパーティションに分割されてクラスタ内のノード(Executor)に分散されます。Taskの説明でも説明したように、各パーティションごとに各Taskが処理されます。

データ配置については、Sparkが物理的な配置やネットワークを介したデータ移動が最小限になるように最適化されて配置します。例えば、あるキーで集約する(groupByKey等)処理を実行すると、データを処理ノードに再分配するシャッフルという処理が必要になることがあります。シャッフルが発生するとネットワーク越しのデータ移動が発生することからシャッフルがなるべく発生しないようなプログラミングをすることがSparkの性能確保につながります。

パーティションがいくつに分かれて保持されるかは、データの読み込みや処理内容に依存するため、一概にこうなるとは言えません。デフォルトのパーティション数を設定することも可能ですが、例えば、データソース上で複数の小さいCSVファイルを持っている場合、各csvファイルはそれぞれ異なるパーティションとして読み込まれることが一般的です。

処理ノード数に応じてパーティションを調整したい場合は、repartitioncoalesceといったメソッドで調整することも可能です。しかし、このような調整は、ノードやリソース数に基づいて最適なパーティション数を持たせることを目的とします。また、過度に再パーティショニングすると、大量のデータ移動などパフォーマンスの低下の可能性があるため注意が必要です。

基本的には、Sparkの処理を初めて実行する際は、デフォルトのパーティション数やSparkの自動的な最適化に任せるのが最良です。その上で、Spark UIやログを利用して性能のボトルネックやタスクの不均等な分布といった問題を確認した場合、特定の操作やパーティションの調整を行って性能の最適化を図るというアプローチが推奨されます。

また、データ分散やバランスを保つため、特に大規模なシャッフルが発生する可能性がある操作の前後でrepartitioncoalesceといった操作を適切に使用することで処理効率を向上させることができる可能性があります。

以上が、パーティションに関する基本的な考え方と最適化の考え方です。パーティションの数やサイズ、シャッフルの発生を適切に管理し、リソースを効率的に利用することで、Sparkの処理性能を最大限に引き出すことができます。

Sparkの処理概念

Sparkでの処理について理解が必要な概念として「変換 (Transformations)」「アクション (Actions)」「遅延評価 (Lazy Evaluation)」があります。Sparkの効率的で柔軟な基盤を支えるこれらの概念をしっかり理解することが重要です。

Sparkのメソッドは、変換またはアクションのいずれかの操作に分類されます。また、処理の実行は遅延評価により実行されます。概要図は以下のような形になります。以降でそれぞれについて詳細を説明します。

変換(Transformations)、アクション(Actions)、遅延評価(Lazy Evaluation)

変換 (Transformations)

変換(Transformations)のメソッドは、データセット(RDD、DataFrame、Dataset)を別のデータセットに変換します。

例えば、DataFrame/Datasetの代表的な変換メソッドとして、条件行だけ抽出するfilterやグループ化するgroupBy、列を追加または置き換えるwithColumnといったデータセットに対して加工するための処理が含まれます。

変換処理は即時実行されるわけではなく、遅延評価される処理のチェーンとして記録されます。

アクション (Actions)

アクション(Actions)のメソッドは、データセットに対して計算をしたり、外部にデータを出力したりします。アクションメソッドが呼び出されるとSparkは、変換で記録されている操作のチェーンを実行します。

例えば、Dataframe/Datasetの代表的なアクションメソッドとして、行数をカウントするcount、DataFrameを出力するshow、データファイルを書き出すwriteなどがあります。

アクションでは、それまで記録してきた変換処理がアクションの実行タイミングで実行されるため、不要なアクションメソッドを入れてしまうことで全体の最適化に影響が出る可能性がありますので注意しましょう。

遅延評価 (Lazy Evaluation)

上記の説明の通り、Sparkの変換はすぐには実行されません。代わりにSparkは変換のチェーンを記録しておき、アクションが呼び出されたときに初めて処理のチェーンを実行します。

このように必要なタイミングで処理が実行されることを遅延評価 (Lazy Evaluation)といいます。

遅延評価によりSparkは、全体の処理のワークフローを最適化して、不要な計算や中間データの保存などを避けることができ、効率的な分散処理の計算プランを作成することができます。

DataFrame/Datasetにおける代表的な変換/アクションメソッド

DataFrame/Datasetにおける代表的な変換とアクションのメソッドを紹介しておきます。以下はすべてのメソッドというわけではなく、主要なメソッド例ですので必要に応じて公式ドキュメントを調べるようにしてください。

自分が扱っているメソッドが変換かアクションかを常に意識することが、効率的なコードを作成するためには重要です。

変換 (Transformations) メソッド
メソッド名概要
select一つ以上の列を選択する
filter または where条件を満たす行だけを選択する
groupBy一つまたは複数の列でグループ化する
orderBy または sort一つまたは複数の列でソートする
drop指定した列を削除する
withCoulmn新しい列を追加または既存の列を置き換える
withColumnRenamed列名を変更する
aliasDataFrameのエイリアスを設定する
(特に結合操作で便利)
join他のDataFrameと結合する
union二つのDataFrameを連結する
distinct重複行を除去する
repartitionDataFrameを再パーティションする
coalesceパーティション数の数を減らす
crossJoinカルテシアン積を形成するための結合を行う
pivotピボットテーブルの作成に使用する
アクション (Actions) メソッド
メソッド名概要
collect全てのデータをドライバーノードに収集する
(大量データで使用するとメモリの問題が発生する可能性があるため注意が必要)
count行数をカウントする
first または head最初の行を返却する
take最初のN行を返却する
show最初のN行をコンソールに表示する
writeデータをファイルに書き出す
例:write.parquet(), write.csv()
foreach または foreachPartition各行に関数を適用する
agg または aggregate集約操作を行う
reduce行を結合して単一の行を生成する
saveAsTableDataFrameをHive互換のテーブルに保存する

Sparkにおいて、DataFrameやDatasetはRDD上に構築されたAPIで、背後にRDDが存在します。ただし、DataFrameやDatasetにおいて直接RDDのメソッドを使用することはできません。もしRDDのメソッドを利用する場合は、DataFrameやDatasetをRDDに変換する必要があります。

Sparkでは、DataFrameやDatasetの利用が推奨されています。これは、SparkがDataFrameやDatasetの構造と操作を最適化し、より高速な計算を行えるためです。そのため、今回はDataFrame/Datasetに絞ってメソッド例を紹介させていただきました。

RDDの直接操作が必要な場合は、RDDのメソッドを別途調べていただければと思います。

まとめ

Apache Sparkの動作を理解するために、Sparkアプリケーションの概念について解説しました。

PythonでSparkを利用する場合には、PySparkというAPIを使うことが可能ですが、Sparkの概念を理解せずに利用すると、非効率なコードになる可能性があります。

本記事では「アプリケーション構成と実行ノード」「Sparkのデータ構造」「Sparkの処理概念」といった内容について概要を紹介しました。

Sparkアプリケーションの主要な概念の大枠を理解することで、実装能力の向上やチューニングの際の手がかりとなればと思います。