PySpark

Sparkアプリケーションの概念を理解する

Sparkアプリケーションの概念を理解する
naoki-hn

Spark の動作理解のために Spark アプリケーションの概念について解説します。

Spark アプリケーションの概念を理解する

Apache Spark は、大規模なデータを処理するためのオープンソースの分散処理フレームワークです。Spark の概要については「Apache SparkとPySparkの概要」を参考にしてください。

Python の場合には、PySpark といった API を使うことで Spark を簡単に使うことができます。Spark を使うことで分散処理を自動で実行できるためプログラマの負担は減ります。ただし、Spark にある概念を理解せず使用していると、実は非常に非効率なコードになることがあります。また、explain メソッドによる実行計画や Spark UI によるチューニング時にもどのように考えればよいか分からなくなります。

この記事では、Spark アプリケーションの主要概念を理解することで、実装能力の向上やチューニングの際の手がかりとなるような情報を紹介します。

具体的には以下のようなトピックスを扱います。

  1. アプリケーション構成と実行ノード
  2. Spark のデータ構造
  3. Spark の処理概念

以降では、それぞれの項目について詳細を説明していきます。

アプリケーション構成と実行ノード

Spark アプリケーションを理解するためには、構成を理解することが重要です。Spark アプリケーションの構成について説明していきます。

PySpark 等の API を用いて作成されたユーザープログラムを Spark Application といいます。この Spark Application は、Spark Driver で実行されます。Driver プログラムは、Spark クラスター上のあるノードで実行されます。

また、Spark Executor は Spark クラスター上の処理ノードであり、Executor は処理を実行する分散ノードです。それぞれのノードに実行のためのコアを持ちます。

Spark Session は、Spark を操作するための入り口のインターフェースで、Spark Application では開発者が Spark Session を生成して処理を実行します。

Spark Application の構成

例えば、PySpark では以下のように Spark Session を生成します。

from pyspark.sql import SparkSession

# Sparkセッションの作成
spark = SparkSession.builder.appName("Example").getOrCreate()

上記は、"Example" という Spark Session を作成しています。

Driver プログラムは、処理する内容を 1 つまたは複数の Job に変換します。これらの各 Job は DAG (Directed Acyclic Graph) に変換されます。DAG は、有向非循環グラフのことで、Spark は、データ処理のタスクを DAG としてモデル化することで、最適化された実行計画を作成し、処理の高い効率性を実現します。

Job はさらに複数の Stage で構成されます。これは、連続または並列で実行できる操作に基づいて作成されます。Stage が変わる際には、ノード間でデータのシャッフルによるデータ転送が発生することがあります。このシャッフルは、実行速度に影響を与えることがあるため注意が必要なポイントです。

さらに、各 Stage は複数の Task から構成されます。各 Task は、Executor の単一のコアにマッピングされ、データの単一のパーティションに対して処理を行います。パーティションの考え方については後述します。

Job, Stage, Taskの関係性

Spark のデータ構造

Spark では、処理対象の大規模データを扱うためのデータ構造が用意されています。ここでは、Sparkのデータ構造として理解しておくべき「RDD」「DataFrame」「Dataset」とパーティションの考え方について紹介します。

RDD (Resilient Distributed Dataset)

Spark は、RDD (Resilient Distributed Dataset) が主要なデータ構造となっています。

RDD は、不変性 (Immutability) を持つデータ構造で、一度作成すると変更はできません。データの加工などで変更が発生する場合は、新しい RDD が生成されます。また、RDD はクラスタ内の複数ノードにまたがって分散して保持され、Executor 内の複数のタスクが RDD の各パーティションを処理します。

また、メモリやディスクにキャッシュや永続化することもできるため、同じ RDD に対する複数回操作の計算時間を大幅に短縮できる特徴もあります。

RDD は、元のデータとの間に適用された変換のシーケンスを保持するため、データが欠損しても元データから再計算により復元する能力を持っています。RDD は、Resilient (復元力のある)、分散された (Distributed)、データセット (Dataset)という名の通り、耐障害性を備えたデータ構造となっています。

DataFrame

Spark のデータ構造として、RDD を紹介しましたが、RDD の上に構築された高レベルの API として DataFrame があります。

Python の分析ライブラリとして有名な pandas の DataFrame に似ていますが、Spark の DataFrame は分散処理能力を備えているものであり、pandas の DataFrame とは全く違うものです。

Spark の DataFrame から pandas の DataFrame への変換方法も提供されていますが、pandas の DataFrame は分散環境に対応していないため、単一の特定ノードにデータが集約されることになり、速度低下の可能性があるため注意が必要です。

DataFrame と RDD のどちらの API を使うかという選択がありますが、DataFrame は、Spark の最適化エンジンである Catalyst により自動的にクエリ最適化が行われる点や、多くのユーザーにとって使いやすいとされる API が提供されているため、基本的には DataFrame の使用を推奨します。

ただし、特定の要件で低レベルの操作や細かな制御が必要な場合は、RDD を直接使用することも検討が必要です。

Dataset

Spark では、Dataset というデータ構造もが提供されています。Dataset は、型安全である点で DataFrame と区別されます。具体的には、DataFrame は、動的型付けを採用しているのに対して、Dataset では静的型付けとコンパイル時の型検査をサポートしています。

Spark 2.0 以降で、DataFrame は、Dataset[Row]Row 型の Dataset)として内部表現されています。つまり、DataFrame と Dataset は、同じ API を使用できます。

Dataset は、静的型付け言語の Scala や Java で使用可能ですが、動的型付けの言語である Python や R では API が提供されていません。Scala や Java では、DataFrame も使用できますが、型安全の特徴を最大限活用するためにも Dataset を使うことを推奨します。

以上の RDD、DataFrame、Dataset が、Sparkの主要なデータ構造です。使用するデータ構造は、プロジェクト要件や使用するプログラム言語によって異なるため、それぞれの特性を理解し、適切に選択することが重要です。

パーティション

Apache Spark の Application 構成や処理の概要、データ構造について見てきました。Spark の動作を正確に把握するためには、データがどのように分割されて処理されるかが重要です。その際にポイントとなるのがパーティションという考え方です。ここでは、パーティションの考え方を紹介します。

RDD、DataFrame、Dataset というデータ構造では、Spark 上での処理の際に、複数のパーティションに分割されてクラスタ内のノード (Executor) に分散されます。Task の説明でも説明したように、各パーティションごとに各 Task が処理されます。

データ配置は、Spark が物理的な配置やネットワークを介したデータ移動が最小限になるように最適化して配置します。例えば、あるキーで集約する (groupByKey 等) 処理を実行すると、データを処理ノードに再分配するシャッフルという処理が必要になる場合があります。シャッフルが発生するとネットワーク越しのデータ移動が発生するため、シャッフルがなるべく発生しないようなプログラミングをすることが Spark の性能確保のポイントになります。

パーティションがいくつに分かれて保持されるかは、データの読み込みや処理内容に依存するため、一概には言えません。デフォルトのパーティション数を設定することも可能ですが、例えば、データソース上で複数の小さい csv ファイルを持っている場合、各 csv ファイルはそれぞれ異なるパーティションとして読み込まれることが一般的です。

処理ノード数に応じてパーティションを調整したい場合は repartitioncoalesce といったメソッドで調整することが可能です。しかし、このような調整は、ノードやリソース数に基づいて最適なパーティション数を持たせることを目的とします。また、過度に再パーティショニングすると、大量のデータ移動が発生してしまい、パフォーマンスの低下につながる可能性があるため注意が必要です。

基本的には、Spark の処理を初めて実行する際は、デフォルトのパーティション数やSpark の自動的な最適化に任せるのが最良です。その上で、Spark UI やログを利用して性能のボトルネックやタスクの不均等な分布といった問題を確認した場合、特定の操作やパーティションの調整を行って性能の最適化を図るアプローチが推奨されます。

また、データ分散やバランスを保つため、特に大規模なシャッフルが発生する可能性がある操作の前後で repartitioncoalesce といった操作を適切に使用することで処理効率を向上させることができる可能性があります。

以上が、パーティションに関する基本的な考え方と最適化の考え方です。パーティションの数やサイズ、シャッフルの発生を適切に管理し、リソースを効率的に利用することで、Spark の処理性能を最大限に引き出すことができます。

Spark の処理概念

Sparkでの処理について理解が必要な概念として「変換 (Transformations)」「アクション (Actions)」「遅延評価 (Lazy Evaluation)」があります。Spark の効率的で柔軟な基盤を支える概念であるため、しっかり理解しましょう。

Spark のメソッドは、変換またはアクションのいずれかの操作に分類されます。また、処理の実行は遅延評価により実行されます。概要図は以下のような形になります。以降でそれぞれについて詳細を説明します。

変換(Transformations)、アクション(Actions)、遅延評価(Lazy Evaluation)

変換 (Transformations)

変換 (Transformations) のメソッドは、データセット (RDD、DataFrame、Dataset) を別のデータセットに変換します。

例えば、DataFrame / Dataset の代表的な変換メソッドとして、条件行だけ抽出する filter やグループ化する groupBy、列を追加または置き換える withColumn といったデータセットに対して加工するための処理が含まれます。

変換処理は、即時実行はされず、遅延評価される処理のチェーンとして記録されます。

アクション (Actions)

アクション (Actions) のメソッドは、データセットに対して計算をしたり、外部にデータを出力したりします。アクションメソッドが呼び出されると Spark は、変換で記録されている操作のチェーンを順に実行します。

例えば、Dataframe / Datasetの代表的なアクションメソッドとして、行数をカウントする count、DataFrame を出力する show、データファイルを書き出す write などがあります。

アクションでは、それまで記録してきた変換処理が最適化されて実行されます。そのため、処理の途中で show などのアクションメソッドを不必要に入れてしまうと、全体としての最適化ができずに応答性能に影響が出る可能性があるので注意しましょう。 

遅延評価 (Lazy Evaluation)

上記の説明の通り、Spark の変換はすぐには実行されません。代わりに Spark は変換のチェーンを記録しておき、アクションが呼び出されたときに初めて処理のチェーンを実行します。

このように必要なタイミングで処理が実行されることを遅延評価 (Lazy Evaluation)といいます。

遅延評価により Spark は、全体の処理のワークフローを最適化して、不要な計算や中間データの保存などを避けるように計算プランを作成します。これにより、Spark では高速な分散処理が実現できます。

DataFrame / Datasetにおける代表的な変換/アクションメソッド

DataFrame / Dataset における代表的な変換とアクションのメソッドを紹介します。以下はすべてのメソッドというわけではなく、主要なメソッド例ですので必要に応じて公式ドキュメントを調べてください。

扱っているメソッドが変換かアクションかを常に意識することで、効率なコードを作成することができるようになります。

変換 (Transformations) メソッド
メソッド名概要
select1 つ以上の列を選択する
filter または where条件を満たす行だけを選択する
groupBy1 つまたは複数の列でグループ化する
orderBy または sort1 つまたは複数の列でソートする
drop指定した列を削除する
withCoulmn新しい列を追加または既存の列を置き換える
withColumnRenamed列名を変更する
aliasDataFrame のエイリアスを設定する
(特に結合操作で便利)
join他の DataFrame と結合する
union2 つの DataFrame を連結する
distinct重複行を除去する
repartitionDataFrame を再パーティションする
coalesceパーティション数の数を減らす
crossJoinカルテシアン積を形成するための結合を行う
pivotピボットテーブルの作成に使用する
アクション (Actions) メソッド
メソッド名概要
collect全てのデータをドライバーノードに収集する
(大量データで使用するとメモリの問題が発生する可能性があるため注意が必要)
count行数をカウントする
first または head最初の行を返却する
take最初の N 行を返却する
show最初の N 行をコンソールに表示する
writeデータをファイルに書き出す
例:write.parquet()write.csv()
foreach または foreachPartition各行に関数を適用する
agg または aggregate集約操作を行う
reduce行を結合して単一の行を生成する
saveAsTableDataFrame を Hive 互換のテーブルに保存する
注意点

Spark において、DataFrame や DataSet は RDD 上に構築された API で、背後に RDD が存在します。ただし、DataFrame や DataSet において直接 RDD のメソッドを使用することはできません。もし、RDD のメソッドを使用したい場合には、DataFrame や DataSet を RDD に変換する必要があります。

Spark では、DataFrame や DataSet の利用が推奨されています。これは、Spark が DataFrame や DataSet の構造と操作を最適化し、より高速な計算を行えるためです。そのため、今回は DataFrame / DataSet に絞ってメソッドの例を紹介させていただきました。

RDD の直接操作が必要な場合は、RDD のメソッドを別途調べていただければと思います。

まとめ

Apache Spark の動作を理解するために Spark アプリケーションの概念について解説しました。

Python で Spark を利用する場合には、PySpark という API を使うことが可能ですが、Spark の概念を理解せずに利用すると、非効率なコードになる可能性があります。

この記事では「アプリケーション構成と実行ノード」「Sparkのデータ構造」「Sparkの処理概念」といった内容について紹介しました。

Spark アプリケーションの主要な概念の大枠を理解することで、実装能力の向上やチューニングの際の手がかりとなればと思います。

あわせて読みたい
【Python Tech】プログラミングガイド
【Python Tech】プログラミングガイド
ABOUT ME
ホッシー
ホッシー
システムエンジニア
はじめまして。当サイトをご覧いただきありがとうございます。 私は製造業のメーカーで、DX推進や業務システムの設計・開発・導入を担当しているシステムエンジニアです。これまでに転職も経験しており、以前は大手電機メーカーでシステム開発に携わっていました。

プログラミング言語はこれまでC、C++、JAVA等を扱ってきましたが、最近では特に機械学習等の分析でも注目されているPythonについてとても興味をもって取り組んでいます。これまでの経験をもとに、Pythonに興味を持つ方のお役に立てるような情報を発信していきたいと思います。どうぞよろしくお願いいたします。

※キャラクターデザイン:ゼイルン様
記事URLをコピーしました