見出し画像

ストリーミングジョブ変更リリースの注意点

はじめに

リアルタイムデータ処理では、データパイプラインの継続的な運用が求められます。しかし、システムを運用していると、スキーマの変更やバージョンアップが発生します。特にストリーミングジョブでは、処理を停止することなく新しいバージョンへ移行を求められることがあり、ダウンタイムの発生を最小限に抑える工夫が求められます。

一方で、全ての変更が処理を停止できるわけではありません。スキーマの変更には互換性のあるものとないものがあり、非互換な変更を適用する場合は、既存のジョブやデータに影響が及ばないよう一時的に停止をする必要があるケースが存在します。また、Dataflow における Apache Beam SDK のバージョンアップやデータ処理ロジックの変更に伴い、一時的に停止をすることがあります。

本記事では、一例として Google Dataflow を利用したリアルタイムパイプラインのリリース方法について整理します。特に、スキーマ変更やバージョンアップの際にサーバーを停止する必要があるケースと、そうでないケースを分類し、それぞれの適用方法や注意点について書いていきます。リリースを実現するための具体的な手段についても紹介するので、リアルタイムパイプラインを運用しているエンジニアにとって実践的な内容となれば幸いです。

前提

利用サービスと構成

今回の構成は分かり易さのため、非常にシンプルにしています。検証のために利用するサービスは、Google Cloud のSpanner, Dataflow, BigQuery になります。記事の中で、これらを選んだ理由としては、構築難易度や経済的に優しい(利用方法によっては無料である)ためです。もちろん、AWS や Snowflake, Databrics, その他クラウドサービスや SaaS 等を利用して頂いて結構です。

今回利用する構成

データソース(Spanner)の変更前スキーマ

データソース(Spanner)の変更前スキーマ

データレイク(BigQuery)の変更前スキーマ

データレイク(BigQuery)の変更前スキーマ

※ BigQuery はデータレイクハウスでもあるため記事中は「データレイク」と記載します。

ストリーミングジョブ(Google Dataflow)

記事の中心となるストリーミングジョブは、Google Dataflow を利用します。Dataflow にはテンプレートというものが存在し、ノーコードで実装することができます。今回利用するのは、Spanner change streams to BigQuery テンプレートになります。

参考になる記事はこちらです。

リリース時の考慮ポイント

リアルタイムデータパイプラインのリリースでは、バッチのデプロイとは異なり、データの流れを維持しながらシステムを更新する必要があります。 Google Dataflow のようなストリーミング処理を扱う場合、ジョブを止めずに変更を適用できるか、あるいはジョブを停止する必要があるかを慎重に判断しなければなりません。そのため、リリースの計画を立てる際には、以下のようなポイントを考慮する必要があります。

まず、スキーマの変更が既存のデータやパイプラインに与える影響を確認することが重要です。例えば、カラムの追加・削除やデータ型の変更は、多くの場合で非互換な変更となり、既存のジョブやクエリがエラーを引き起こす可能性があるため、慎重な対応が求められます。

次に、Dataflow のパイプライン自体の影響範囲を把握することも必要です。ストリーミングジョブの場合、ジョブの入れ替えをどのように行うかが課題となります。古いジョブをそのまま継続するのか、新しいジョブに切り替えるのか(新しいジョブを並行して動かし徐々に切り替えるのか)、といった戦略を選択する必要があります。

さらに、Dataflow における Apache Beam SDK やテンプレートのバージョンアップも慎重に進めるべきものになります。特定のバージョン間では API の非互換性が発生することがあり、事前に変更点を確認し、テスト環境での動作検証を徹底する必要があります。特に、実行環境の更新が必要な場合は、ジョブの再デプロイや設定の変更が必要になることもあるため、影響範囲を把握しておくことが重要です。

(また、完全な停止を伴うリリースではなく、Blue-Green デプロイや Canary リリースのように、新しいバージョンを一部のデータのみで試しながら適用する手法もあるとは思いますが今回は触れません。)

ここからは「ジョブを止めずに変更適用できるケース」と「ジョブを停止する必要があるケース」を紹介します。

1. ジョブを止めずに変更適用できるケース

リアルタイムデータパイプラインのバージョンアップやスキーマ変更を行う際、すべてのケースでサーバーを停止する必要があるわけではありません。互換性のある変更であれば、現在稼働しているジョブを維持しながら、新しい処理を適用することが可能です。 Google Dataflow のようなストリーミングデータ処理サービスでは、段階的な移行や動的なスキーマ適用を活用することで、ダウンタイムなしのリリースが実現できます。

スキーマレスな実装をしている場合

まず、互換性のあるスキーマ変更に関しては、既存のデータ処理パイプラインへの影響が最小限に抑えられるため、安全に適用できるケースが多いです。Dataflow のストリーミングジョブ自体が JSON や Protobuf を利用している場合、スキーマレスな処理を行うことで、新しく追加されたカラムを無視しながらデータを処理できるものもあるため、ジョブの停止を伴わずに適用可能です。
ただし、データソースに列が追加されているのに、データレイクに列が追加されておらず、そのままデータ連携されてしまうことに注意してください。

連携前後のスキーマを変更する場合

次にストリーミングジョブに対しては何も変更せず、データソースとデータレイクのスキーマを変更させるケース(ストリーミングジョブの処理内容がスキーマ一致を前提としている場合)です。
このケースでは、データレイクからスキーマ変更を行うことで仮にデータが連携されても NULL として連携されます。その後、データソースもスキーマ変更を行うことで値が入ります。

↓は、 Google Cloud Templates の例です。

・Before you add a new column to a Spanner change stream scope, first add the column to the BigQuery changelog table. The added column must have a matching data type and be NULLABLE. Wait for at least 10 minutes before you continue to create the new column or table in Spanner. Writing into the new column without waiting might result in an unprocessed record with an invalid error code in your dead letter queue directory.
・To add a new table, first add the table in the Spanner database. The table is automatically created in BigQuery when the pipeline receives a record for the new table.

Handle adding tracking tables or columns

上記の記載があるため、

  1. データレイク ( BigQuery )に新しい列を追加

  2. データソース( Spanner )に新しい列を追加

という順番に実行していきます。

1. データレイク ( BigQuery )に新しい列を追加
2. データソース( Spanner )に新しい列を追加

もし仮に上記の列追加の順番を逆にしたとしても、DLQ (デッドレターキュー)が実装されていればデータを復旧することができます。(主観ですが、最近のツールや SaaS, OSS の多くで実装されている気がします。)

DLQ により GCS に格納された JSON

2. ジョブを停止する必要があるケース

リアルタイムデータパイプラインの運用では、可能な限りダウンタイムを避けることが理想ですが、変更の内容によってはジョブを一時的に停止しなければならないケースもあります。特に ストリーミングジョブがスキーマチェックを実装している場合バージョンアップの場合 は、既存のジョブをそのまま維持できないため、安全なリリースのために一時的な停止が必要となります。

ストリーミングジョブがスキーマチェックを実装している場合

まず、スキーマの非互換な変更 がある場合は、既存のジョブが正しくデータを処理できなくなるため、ジョブを一度停止して新しいスキーマに対応したパイプラインをデプロイする必要があります。特に、スキーマが Avro や Protobuf 等を使用し、ストリーミングジョブがスキーマチェックを実装している場合にスキーマの検証が行われるため、非互換な変更があるとジョブが停止するリスクがあります。このようなケースでは、一時的にパイプラインを停止してスキーマの整合性を取った後に再リリースを行います。
(今回利用した Dataflow Templates では、スキーマチェックは無い想定なので一時的に停止する必要はありません。)

バージョンアップ

また、SDK や Templates のバージョンアップもジョブの停止が必要となる代表的なケースです。Dataflow は SDK のバージョンに依存しているため、破壊的変更を含むバージョンアップを行うと、既存のパイプラインが動作しなくなる可能性があります。特に、古い SDK では動作していた処理が新しいバージョンでは非推奨または削除されている場合、ジョブの再デプロイが必要になります。事前に十分なテストを行い、影響範囲を正しく把握しておくことが不可欠です。
(今回利用している Dataflow Templates では1ジョブあたり1テンプレートのため、一時的に停止が必要です。)

ロールバックが必要になった場合

これはリアルタイム連携に限った話では無いですが、リリース後にエラーが発生し、ロールバックが必要になった場合 も、一時的にジョブを停止して復旧作業を行う必要が出てきます。
特に、誤ったスキーマ変更やデータ処理のバグによって不正なデータが蓄積された場合は、問題のあるデータをクリーニング(もしくはDLQでの解消)し、正しい状態に戻した上でジョブを再稼働することが求められます。

まとめ

リアルタイムデータパイプラインのリリースは、バッチ処理とは異なり、データの流れを止めることなく変更を適用することが求められます。本記事では、Google Dataflow を用いたリアルタイム処理におけるスキーマ変更やバージョンアップのリリース方法について整理しました。

スキーマの変更には、互換性のある変更と非互換な変更があり、互換性のある変更であればジョブを停止することなく適用することが可能です。特に、スキーマレスな処理を行う場合や、データレイクのスキーマを先に変更することでデータの欠損を防ぐ手法について説明しました。
一方、ストリーミングジョブがスキーマチェックを実装している場合や Apache Beam SDK のバージョンアップ、データ処理ロジックの大幅な変更が必要な場合は、ジョブを一時的に停止し、慎重にリリースを進める必要があります。

また、誤った変更による影響を最小限に抑えるために、事前のテストや DLQ(デッドレターキュー)の活用、ロールバックの準備が必要となるケースもあります。

今回の検証では Google Cloud の Spanner, Dataflow, BigQuery を活用しましたが、他のクラウドサービスやデータ基盤でも同様の考え方が適用できます。

何か誤っている内容を発見しましたら、ご連絡ください。

関連記事


いいなと思ったら応援しよう!

zono
よろしければ応援お願いします! いただいたチップはデータエンジニアとしての活動費や勉強代、教育に使わせていただきます!