
Spark 4.0では何が変わるのか? (中編)
前回の前編の続きです。
Spark 4.0 previewをもとに、spark 4系では何が変わるのか整理したいと思います。
今回は、以下の内容について紹介していきます。Structured Streaming周りの内容が中心です。
Streaming State Data Source
Structured Streamingの状態データ(State Data)に対する読み取り機能を提供します。なお、書き込み追加機能については、今後のロードマップに含まれています。この辺りを理解するためには、Structured Streaming(構造化ストリーミング)の理解がある程度必要のため、先にこちらを簡単に説明します。
Structured Streaming
構造化ストリーミング(Structured Streaming)は、Spark SQLエンジン上に構築されたストリーム処理エンジンです。静的データに対するバッチ計算と同じようにストリーム計算を表現できる点が、ユーザ体験として優れています。
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
Structured Streamingの基本的な考え方は、下図に示すように、DataStreamを小さく分割し、それぞれを無制限テーブル(Unbounded Table)に対する挿入レコードとして扱います。

無制限テーブル(Unbounded Table)は、入力データ全てを保持しているわけではなく、処理に必要な最小限の入力データあるいは中間データのみを保持します。無制限テーブル(Unbounded Table)に対するクエリ実行の結果として、結果テーブルの生成あるいは更新が行われます。ここで、結果テーブルは、更新の度に外部sinkにデータを書き込む必要があります。詳細は割愛しますが、外部sinkには、ファイルへの書き込み、kafkaへのデータ送信等を選択できます。
ここで、結果テーブルの更新内容をどこまで外部sinkに書き込むかには、3つの選択肢があります。
complete mode : 結果テーブルが更新された場合、全体を常に書き出します。
append mode: 結果テーブルのうち、新規に追加された行のみを書き出します。既存の結果テーブルのレコードが更新されないケースで有効です。後半でお話しするstateful operatorを使用する場合には、append modeである必要があります。
update mode: 結果テーブルのうち、更新された行のみを書きだします。既存の結果テーブルのレコードに更新がない場合、append modeと同等になります。
少し、話題はズレますが、spark の構造化ストリーミング処理は、デフォルトでは、マイクロバッチ処理で成り立っており、小さなデータの塊ごとに操作を行います。小さな塊ごとに操作を行うことで、ストリーミング処理のend2endでのレイテンシを100ミリ秒まで抑えつつ、exactly-once fault-tolerance (必ず1回の実行) を保証します。つまり、処理コンテナの再起動や再処理により、end2endで1回限りの実行を保証します。
一方で、spark 2.3以降は、継続処理(Continuous Processing)モードが追加されており、レイテンシを1ミリ秒に抑えた実行が可能になります。ただし、at-least-once fault-tolerance(少なくとも1回は実行)保証になる点には注意が必要です。二重のデータ生成が許容できるようなケース、後続システムで結果整合性(eventual consistency)が保証できれば十分なシステムのケースでは有効です。継続処理(Continuous Processing)モードは、Experimentalな機能のため、詳細については、また別の機会があればまとめてみたいと思います。なお、モード選択に関しては、アプリケーションのコードに変更を加えずに、変更可能です。
State Reader API
前節で説明の通り, Structured Streamingでは、テーブルデータや中間データを保持しており、データを小さい単位で逐次処理を行うマイクロバッチ処理にて、ストリーミング処理を実現しています。
ここでマイクロバッチ処理は、以下の2つに分類することができます。
Stateless operation : 現在処理中のマイクロバッチ以外の別のマイクロバッチのデータを知る必要がない処理
例)処理中のレコードの特定の値が10以上かどうかでフィルタしたい場合など
Stateful operation : 現在処理中のマイクロバッチ以外の別のマイクロバッチのデータを知る必要がある処理
例)一定時間のタイムウィンドウの中で計算される、特定のkeyのレコード数をカウントしたい場合など
spark 4.0で追加されるState Reader APIでは、後者のステートフルな処理を行う際の中間データの状態(Local State)を参照することができます。State Reader APIには、state-metadataとstatestoreの2つのデータフォーマットに対応しています。state-metadata形式では、statestoreにどんな情報が格納されているかのハイレベルな情報を提供します。例えば、statestoreのパーティション数などです。一方で、statestore形式では、key-valueデータそのものを参照できます。これらのAPIが存在する前は、開発者はstatestoreの中がわからず、過剰なロギングに頼った開発を強いられていました。またデータの不整合等の問題が生じた際にも、問題解決のための調査に時間を要しました。今回追加されたAPIは、これらの問題を解決し、出力結果とstatestoreの整合性の調査を効率化できます。
Arbitrary Stateful Processing V2
Statefulな集計ユースケースを考えた場合、単純な集計以外に、様々なケースへの対応が必要になってきます。例えば、特定のイベントに対するデータストリームからセッションを追跡して扱いたいケースです。
spark 4.0では、こうした任意の状態を持ったStateful操作に対応するために、新しいoperatorの導入と改善が行われています。
具体的には、transformWithState Operatorが追加されており、transformWithState を介して、TimerやComposite Types(Single Value, List, Map)等を用いて、Statefulで複雑な集計を実装することができます。
今回は、以上です。参考になれば幸いです。
後編もアップできるように、頑張ります。
参考
https://www.databricks.com/dataaisummit/session/whats-next-upcoming-apache-spark-40
https://spark.apache.org/docs/4.0.0-preview1/structured-streaming-programming-guide.html
https://www.databricks.com/blog/multiple-stateful-operators-structured-streaming