見出し画像

【翻訳】Spark3.0 パフォーマンス最適化【GPT-4oによる】

Part1

Apache Sparkは、大規模データ処理と分析のための強力な機能を提供するビッグデータ処理の基盤となっています。Spark 3.0のリリースに伴い、最も重要な改善の一つとして導入されたのが、Adaptive Query Execution(AQE)です。この機能は、ランタイム統計に基づいてクエリプランを動的に最適化および調整することで、Spark SQLのパフォーマンスを向上させることを目的としています。このブログでは、AQEとは何か、その仕組み、そしてSparkジョブを最適化するためにどのように活用できるかを探っていきます。

AQEの主な特徴

動的なシャッフルパーティションの統合:AQEは、処理中の実際のデータサイズに基づいてランタイムでシャッフルパーティションの数を減らすことができます。これにより、多くの小さなパーティションを管理するオーバーヘッドが減り、全体のクエリパフォーマンスが向上します。
偏ったデータの処理:AQEはデータの偏りを検出し、大きなパーティションを小さなパーティションに分割することで対応できます。これにより、特定のタスクが不均衡に長時間かかることを防ぎ、負荷分散が改善されクエリ実行時間が短縮されます。
結合戦略の最適化:ランタイム統計に基づいて、AQEは結合戦略を動的に変更できます。例えば、テーブルの一方のサイズがメモリに収まる程度に小さい場合、ソートマージ結合からブロードキャスト結合に切り替えることができます。

Spark 3.0でのAQEの有効化

SparkでAQEを有効化するのは簡単です。Sparkアプリケーションを起動する際に必要な設定を行うか、Sparkセッション内で動的に設定できます。

AQEを有効にするには、以下の設定を行います:

spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", true)
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum", 1)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64MB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)

動的なシャッフルパーティションの統合の例

  1. `spark-shell`を開きます。

spark-shell
  1. サンプルの売上データをインポートします。

import spark.implicits._

val salesData = Seq(
  ("2021-01-01", "Store_Aurangabad", 100),
  ("2021-01-01", "Store_Mumbai", 150),
  ("2021-01-02", "Store_Jalna", 200),
  ("2021-01-02", "Store_Mumbai", 50),
  ("2021-01-03", "Store_Mumbai", 300),
  ("2021-01-03", "Store_Pune", 250)
)

// DataFrameを作成
val salesDF = salesData.toDF("date", "store", "amount")

// DataFrameを表示
salesDF.show()

// グループバイ操作を実行し、シャッフルをトリガー
val aggregatedDF = salesDF.groupBy("store").sum("amount")

// 集計されたDataFrameを表示
aggregatedDF.show()

// AQEを無効化
spark.conf.set("spark.sql.adaptive.enabled", false)
val aggregatedDFNoAQE = salesDF.groupBy("store").sum("amount")
aggregatedDFNoAQE.show()

出力例

salesDF.show()
+----------+------+------+
|      date| store|amount|
+----------+------+------+
|2021-01-01|store1|   100|
|2021-01-01|store2|   150|
|2021-01-02|store1|   200|
|2021-01-02|store2|    50|
|2021-01-03|store1|   300|
|2021-01-03|store2|   250|
+----------+------+------+

aggregatedDF.show()
+------+-----------+                                                            
| store|sum(amount)|
+------+-----------+
|store1|        600|
|store2|        450|
+------+-----------+

aggregatedDFNoAQE.show()
+------+-----------+                                                            
| store|sum(amount)|
+------+-----------+
|store1|        600|
|store2|        450|
+------+-----------+

以下のスクリーンショットは、AQEが有効な場合と無効な場合の実行の違いを示しています。

AQEが有効(デフォルトのシャッフルパーティション200と異なるシャッフルパーティション)

AQEが無効(デフォルトのシャッフルパーティション200と同じ、残り6は異なるタスク用)


![[Pasted image 20240526090345.png]]

Spark 3.0のAdaptive Query Executionは、クエリプランをランタイムで動的に最適化することで、著しいパフォーマンス向上をもたらす強力な機能です。AQEを有効にすることで、シャッフルパーティションの効率的な処理、偏ったデータの管理の改善、および適応的な結合戦略による、より迅速で信頼性の高いクエリ実行が可能になります。

このブログでは、Sparkがシャッフルパーティションを動的に統合する方法を見てきました。残りの2つの機能については、次回のブログで取り上げます。

Part2

このブログは、Spark 3.0におけるAdaptive Query Execution(AQE)の続編であり、残りの主要な機能について説明します。

AQEの主な特徴

動的なシャッフルパーティションの統合: AQEは、処理中の実際のデータサイズに基づいてランタイムでシャッフルパーティションの数を減らすことができます。これにより、多くの小さなパーティションを管理するオーバーヘッドが減り、全体のクエリパフォーマンスが向上します。

偏ったデータの処理: AQEはデータの偏りを検出し、大きなパーティションを小さなパーティションに分割することで対応します。これにより、特定のタスクが不均衡に長時間かかることを防ぎ、負荷分散が改善されクエリ実行時間が短縮されます。
結合戦略の最適化: ランタイム統計に基づいて、AQEは結合戦略を動的に変更できます。例えば、テーブルの一方のサイズがメモリに収まる程度に小さい場合、ソートマージ結合からブロードキャスト結合に切り替えることができます。

偏ったデータの処理

Apache Spark 3.0は、ランタイム統計に基づいてクエリプランを動的に最適化するためにAdaptive Query Execution(AQE)を導入しました。AQEが提供する大きな改善の一つは、クエリパフォーマンスに重大な影響を与えるデータの偏りを処理することです。このブログでは、AQEがどのようにデータの偏りを処理するかを例を通して説明し、この機能を示す物理プランを分析します。

データの偏りとは?

データの偏りは、パーティション間でデータの分布が不均一である場合に発生します。これにより、一部のタスクが他のタスクよりもはるかに長く実行されることになり、リソースの非効率な利用とクエリ実行時間の増加を引き起こします。

AQEが偏ったデータを処理する方法

AQEは、ランタイムで大きなパーティションを小さなパーティションに分割することで、データの偏りを動的に検出し処理します。これにより、どのタスクも過剰なデータを抱えることがなくなり、負荷分散が改善され、クエリ実行が迅速になります。

例:AQEで偏ったデータを処理する

AQEが実際に偏ったデータをどのように処理するかを例で見ていきましょう。

設定

spark-shell

// ブロードキャストレベルの設定を無効化
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1")
spark.conf.set("spark.sql.join.preferSortMergeJoin", true)

// AQEレベルの設定
spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", false)
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "10KB")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1B")

// シャッフルパーティションの数を10に設定
spark.conf.set("spark.sql.shuffle.partitions", "10")

メインコード

var cityData = Seq.fill(100000)("Aurangabad") ++ Seq.fill(100)("Mumbai") ++ Seq.fill(10)("Chennai") ++ Seq("Hyderabad")

// 都市データのDataFrameを作成
var cityDF = cityData.toDF("city")

// 州データの生成
var stateData = Seq(("Aurangabad", "Maharashtra"), 
                    ("Mumbai", "Maharashtra"), 
                    ("Chennai", "TamilNadu"),
                    ("Hyderabad", "Telangana"))

// 州データのDataFrameを作成
var stateDF = stateData.toDF("cityname", "state")

var joinedDF = cityDF.join(stateDF, cityDF("city") === stateDF("cityname"), "inner")

joinedDF.filter("cityname='Mumbai'").count
joinedDF.filter("cityname='Aurangabad'").count
joinedDF.filter("cityname='Chennai'").count
joinedDF.filter("cityname='Hyderabad'").count

出力

scala> joinedDF.filter("cityname='Mumbai'").count
res11: Long = 100

scala> joinedDF.filter("cityname='Aurangabad'").count
res12: Long = 100000                                                            

scala> joinedDF.filter("cityname='Chennai'").count
res13: Long = 10

scala> joinedDF.filter("cityname='Hyderabad'").count
res14: Long = 1

実行時の物理実行プラン

AdaptiveSparkPlan (19)
+- == Final Plan ==
   CollectLimit (13)
   +- * Project (12)
      +- * SortMergeJoin(skew=true) Inner (11)
         :- * Sort (5)
         :  +- AQEShuffleRead (4)
         :     +- ShuffleQueryStage (3), Statistics(sizeInBytes=3.1 MiB, rowCount=1.00E+5)
         :        +- Exchange (2)
         :           +- LocalTableScan (1)
         +- * Sort (10)
            +- AQEShuffleRead (9)
               +- ShuffleQueryStage (8), Statistics(sizeInBytes=208.0 B, rowCount=4)
                  +- Exchange (7)
                     +- LocalTableScan (6)

Spark UIのSQL/DataFrameの実行プラン

AQE有効時の実行プラン

  • AdaptiveSparkPlan: AQEが使用されていることを示します。

  • SortMergeJoin (skew=true): AQEが結合操作での偏りを検出しています。

  • CustomShuffleReader coalesced: AQEがシャッフルパーティションを動的に調整して、偏ったデータを処理しています。これは、大きなパーティションを小さなパーティションに分割することを含みます。

主要な違い

  1. 偏りの検出: AQE有効時、Sparkは偏ったデータを検出します(skew=true)。

  2. パーティションの調整: AQEはCustomShuffleReader coalescedを使用して、データの分布が均等になるようにパーティションを動的に調整します。

  3. 適応実行: AdaptiveSparkPlanの存在は、プランがランタイム統計に基づいて動的に最適化されたことを示します。

以下の図は、パーティションがどのように分割されるかを示しています。

AQEを使用しない場合のシャッフル後のパーティション

AQEを使用する場合のシャッフル後のパーティション

結論

Spark 3.0のAdaptive Query Execution(AQE)は、ランタイムで物理プランを動的に最適化することでクエリパフォーマンスを大幅に向上させます。特に偏ったデータの処理において、AQEは偏ったパーティションを検出して調整し、より均等な実行とパフォーマンスの向上を実現します。AQE有効時と無効時の物理プランの比較は、AQEがもたらす最適化を明確に示しており、Sparkにおける大規模データ処理タスクの最適化において強力な機能であることを裏付けています。

Part3

以下はSpark 3.0におけるAdaptive Query Execution(AQE)に関するブログの続きであり、残りの重要な機能について説明します。

AQEの主な機能

シャッフルパーティションの動的な結合

AQEは、処理中のデータの実際のサイズに基づいて、実行時にシャッフルパーティションの数を減らすことができます。これにより、小さなパーティションが多すぎることによるオーバーヘッドを削減し、クエリの全体的なパフォーマンスが向上します。

偏ったデータの処理

AQEはデータの偏りを検出し、大きなパーティションを小さなものに分割することで対処します。これにより、単一のタスクが不均衡に長時間実行されることがなくなり、負荷分散が改善され、クエリ実行時間が短縮されます。

ジョイン戦略の最適化

実行時の統計情報に基づいて、AQEはジョイン戦略を動的に変更することができます。例えば、テーブルのサイズがメモリに収まるほど小さい場合、ソートマージジョインからブロードキャストジョインに切り替えることができます。

Apache Spark 3はAdaptive Query Execution(AQE)という大幅な改善を導入しました。この機能は実行時の統計情報に基づいてクエリプランを動的に調整し、パフォーマンスを最適化します。AQEが特に優れているのは、ビッグデータ処理において最もコストがかかる部分であるジョイン操作の最適化です。このブログ記事では、Spark 3におけるAQEがどのようにジョイン操作を最適化するかを探り、この機能を活用するためのベストプラクティスを提供します。

動的なジョインタイプの選択

従来のクエリプランニングでは、推定統計に基づいてジョインタイプ(例:ブロードキャストジョイン、シャッフルジョイン)を選択します。しかし、AQEは実際のデータサイズに基づいてこの決定を行います。例えば、テーブルが予想よりも小さい場合、AQEはより効率的なブロードキャストジョインに切り替えるかもしれません。

動的なパーティションプルーニング

AQEはジョイン操作中に不要なパーティションを動的にプルーニングすることができます。これは特に大規模なデータセットを扱う場合に有効で、ネットワークを介してシャッフルされるデータ量を減らします。

まず、動的なジョインタイプ選択について理解しましょう。以下はAQEの設定プロパティを設定するコードです:

spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1048576)

以下は実際のコードです:
以下は、Scalaを用いたサンプルデータの生成、DataFrameの作成、およびクエリ実行に関するコードの説明です。

import scala.util.Random

// 顧客と都市のシーケンスを定義
var customers = Seq("Ajay", "Raj", "Ashutosh", "Mangesh", "Sharad")
var cities = Seq("Aurangabad", "Mumbai", "Pune", "Jalna")

var random = new Random()

// 大規模なサンプルデータの生成
var customer1 = (1 to 10000).map { _ =>
  val id = random.nextInt(1000)
  val name = customers(random.nextInt(customers.length))
  val city = cities(0)
  (id, name, city)
}

var customer2 = (1 to 1000).map { _ =>
  val id = random.nextInt(1000)
  val name = customers(random.nextInt(customers.length))
  val city = cities(1)
  (id, name, city)
}

var customer3 = (1 to 100).map { _ =>
  val id = random.nextInt(1000)
  val name = customers(random.nextInt(customers.length))
  val city = cities(2)
  (id, name, city)
}

var customer4 = (1 to 10).map { _ =>
  val id = random.nextInt(1000)
  val name = customers(random.nextInt(customers.length))
  val city = cities(3)
  (id, name, city)
}

// 生成したシーケンスを一つに結合
var combinedCustomers = customer1 ++ customer2 ++ customer3 ++ customer4

var order1 = (1 to 10000).map { _ =>
  val id = random.nextInt(1000)
  val totalBill = random.nextInt(500) + 60
  val city = cities(0)
  (id, totalBill, city)
}

var order2 = (1 to 1000).map { _ =>
  val id = random.nextInt(1000)
  val totalBill = random.nextInt(500) + 60
  val city = cities(1)
  (id, totalBill, city)
}

var order3 = (1 to 100).map { _ =>
  val id = random.nextInt(1000)
  val totalBill = random.nextInt(500) + 60
  val city = cities(2)
  (id, totalBill, city)
}

var order4 = (1 to 10).map { _ =>
  val id = random.nextInt(1000)
  val totalBill = random.nextInt(500) + 60
  val city = cities(3)
  (id, totalBill, city)
}

// 生成したシーケンスを一つに結合
var combinedOrders = order1 ++ order2 ++ order3 ++ order4

// DataFrameの作成とキャッシュ
var combinedCustomersDF = spark.createDataFrame(combinedCustomers).toDF("id", "name", "CustCity").repartition(10)
var combinedOrdersDF = spark.createDataFrame(combinedOrders).toDF("id", "totalBill", "OrderCity").repartition(10)

combinedCustomersDF.cache()

// 一時ビューの作成
combinedCustomersDF.createOrReplaceTempView("Customers")
combinedOrdersDF.createOrReplaceTempView("Orders")
combinedCustomersDF.count

// クエリ実行
var resultDF = spark.sql(
  """
  SELECT * 
  FROM Customers
  INNER JOIN Orders
  ON CustCity = OrderCity
  WHERE Customers.name = 'Mangesh'
  """
)
resultDF.show(false)
resultDF.explain(true)

出力結果:

resultDF.show(false)
+---+-------+----------+---+---------+----------+                               
|id |name   |CustCity  |id |totalBill|OrderCity |
+---+-------+----------+---+---------+----------+
|6  |Mangesh|Aurangabad|645|144      |Aurangabad|
|280|Mangesh|Aurangabad|645|144      |Aurangabad|
|928|Mangesh|Aurangabad|645|144      |Aurangabad|
|101|Mangesh|Aurangabad|645|144      |Aurangabad|
|913|Mangesh|Aurangabad|645|144      |Aurangabad|
|802|Mangesh|Aurangabad|645|144      |Aurangabad|
|414|Mangesh|Aurangabad|645|144      |Aurangabad|
|866|Mangesh|Aurangabad|645|144      |Aurangabad|
|109|Mangesh|Aurangabad|645|144      |Aurangabad|
|29 |Mangesh|Aurangabad|645|144      |Aurangabad|
|712|Mangesh|Aurangabad|645|144      |Aurangabad|
|179|Mangesh|Aurangabad|645|144      |Aurangabad|
|621|Mangesh|Aurangabad|645|144      |Aurangabad|
|800|Mangesh|Aurangabad|645|144      |Aurangabad|
|280|Mangesh|Aurangabad|645|144      |Aurangabad|
|136|Mangesh|Aurangabad|645|144      |Aurangabad|
|78 |Mangesh|Aurangabad|645|144      |Aurangabad|
|675|Mangesh|Aurangabad|645|144      |Aurangabad|
|471|Mangesh|Aurangabad|645|144      |Aurangabad|
|133|Mangesh|Aurangabad|645|144      |Aurangabad|
+---+-------+----------+---+---------+----------+
only showing top 20 rows

実行プラン:

resultDF.explain(true)
 == Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [CustCity#8], [OrderCity#20], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, false]),false), [plan_id=185]
   :  +- Filter ((isnotnull(name#7) AND (name#7 = Mangesh)) AND isnotnull(CustCity#8))
   :     +- InMemoryTableScan [id#6, name#7, CustCity#8], [isnotnull(name#7), (name#7 = Mangesh), isnotnull(CustCity#8)]
   :           +- InMemoryRelation [id#6, name#7, CustCity#8], StorageLevel(disk, memory, deserialized, 1 replicas)
   :                 +- AdaptiveSparkPlan isFinalPlan=false
   :                    +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=6]
   :                       +- LocalTableScan [id#6, name#7, CustCity#8]
   +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=182]
      +- LocalTableScan [id#18, totalBill#19, OrderCity#20]

AQE最適化のポイント

  • 動的ジョインタイプ選択: AQEは、BroadcastHashJoinを選択し、小さなデータセットをブロードキャストすることでジョイン操作を最適化します。

  • データのキャッシュと再利用: InMemoryTableScanとTableCacheQueryStageの使用は、中間結果がキャッシュされ、繰り返しデータスキャンが不要になり、パフォーマンスが向上することを示しています。

  • 動的パーティショニング: プランにはいくつかのExchange操作が含まれ、AQEがデータを動的に再パーティショニングして負荷を分散し、データの偏りを最小限に抑える方法が示されています。

  • 統計に基づく最適化: AQEは、実行時の統計情報(例: sizeInBytes, rowCount)を使用して、データのブロードキャスト、シャッフル、ジョイン戦略に関する情報に基づいた意思決定を行います。

Spark 3のAdaptive Query Executionは、ジョイン操作や全体的なクエリパフォーマンスに対して強力な最適化を提供します。実行時にプランを動的に調整することで、AQEはデータの偏りや最適でないジョイン戦略といったビッグデータの一般的な課題に対処します。これらの機能を理解し活用することで、Sparkアプリケーションの効率を大幅に向上させることができます。定期的なモニタリング、プロファイリング、チューニングが、特定のワークロードにおけるAQEの利点を最大限に活用するために不可欠です。

参考

Adaptive Query Execution in Spark 3.0: A Game Changer for Performance Optimization Part 1.
Adaptive Query Execution in Spark 3.0: A Game Changer for Performance Optimization Part 2.
Adaptive Query Execution in Spark 3.0: A Game Changer for Performance Optimization Part 3.

いいなと思ったら応援しよう!

Puuuii | 伝える技術と心理学で戦うデータエンジニア
え、チップくれるん? ありがとうなぁ! この恩は3日ぐらい忘れへんから🫡

この記事が参加している募集