Dataproc サーバレスでBigQueryのデータを加工するクイックスタート
概要
PySparkを使用してBigQueryのデータを加工するバッチ処理をDataproc サーバレスで動かしたので、その時の体験記を残す。
以下の記事を実行するうえで同記事内では直接書かれていないことも含めて記載する。
事前準備
VPCサブネットの作成
私はDataproc用のVPCネットワークを作成して、そこにサブネットを作成する方法を取りました。(defaultをいじりたくなかったため)
ネットワーク設定の具体的な方法は以下の記事をまねしました。
Cloud Storage バケットの作成
pysparkのコード内で使用するGCSバケットを準備する。
pysparkのコードは上記の「BigQuery コネクタを Dataproc Serverless for Spark とともに使用する」記事内のコードを適宜書き換えてそのまま使用する。
BigQueryデータセットの作成
Cloud Storageのバケットと同様にコード内で使用するBigQueryのデータセットを準備する。
sparkジョブをsubmitする
基本コマンド
ここはいくつかはまりポイントがあったので解決策含め記載する。
gcloudコマンドでジョブをsubmitする。基本コマンドは上記の記事内の通り以下。
gcloud dataproc batches submit pyspark wordcount.py\
--region=region \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-version.jar \
... other args
はまりポイント1:windows環境でローカルのソースを使用してsubmitする場合
まずローカルのpythonファイルを使用する場合windowsだと私の環境ではうまくできなかった。エラー原因は以下の通りで、GCSオブジェクトに\が入ってしまうのが問題のよう。windows環境だとダメのようなので、linux環境を使用する(docker, WSL, Cloud Shellなど)を使用するとよい。
試してないけど、GCS上にソースコードをアップロードしてそれを指定すればたぶんwindows環境でもこの問題を回避できる気がする。
java.lang.IllegalArgumentException: Illegal character in path at index 55: gs://[bucket-name]/dependencies\sample_spark.py
はまりポイント2:BigQueryコネクタを使用する場合のClassNotFoundException
ジョブのsubmit時にsparkランタイムのバージョンを指定しないと、デフォルトのバージョンを使用する仕様だが、そのバージョンとBQコネクタのjarファイルのバージョンを合わせる必要があるらしい。
参考にしたのは以下の記事。
submit時にランタイムを指定して、jarファイルのバージョンをそれに合わせたものにするのが安全そう。この問題はsparkランタイムのversion2がリリースされた2023年1月以降に発生するようになったみたいなので、少し古い記事だとコマンドでランタイム指定を明示的にしているものはあまりなさそう。
完成コマンド
上記をふまえて以下のコマンドを実行してsparkジョブをsubmitした。
gcloud --project [project-name] dataproc batches submit pyspark [python-file-name] \
--version=2.0 --batch=sample-spark --region=[region] \
--deps-bucket=[bucket-name] \
--subnet [subnet-name] \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-0.29.0.jar