見出し画像

Vertex AI PipelinesでBigQueryの前処理とAutoMLの学習を自動化したよ

こんにちは、すずきです。
年末ですね。今年のベストバイはゴマキの写真集です。今年どころか人生ベストバイかもしれません。生きていることに感謝。神様、ありがとうございます(無宗教)

ところで、BigQueryに日々追加されるデータを月1でモデル学習に利用していましたが、前処理やラベル付け、学習の一連の手順が手作業でかなりミスが発生していました。運用効率を上げるため、Google CloudのVertex AI Pipelinesでこれらのプロセスを自動化しました。


Vertex AI Pipelinesとは

Vertex AI Pipelinesは、Google Cloud上で機械学習パイプラインを構築・管理できるサービスです。Kubeflow Pipelines SDKやTFX Pipeline DSLで、Kubernetesクラスタの管理を意識することなく、サーバーレス環境で効率的にパイプラインを実行できます。

Google Cloudのエコシステムとの親和性が高く、以下のような利点があります。

  • BigQueryやAutoML用のコンポーネントが用意されている

  • Cloud Schedulerを利用せずにスケジュール実行が可能

  • サーバレスでスケーラブル

実装手順

ファイル構成

パイプラインの構成と実行に必要なファイルは以下のように管理しています。

- sql_queries/
  - add_reaction_label.sql
  - training_pre_processing.sql
  - undersampling.sql
- pipeline_definition.py
- pipeline_notebook.ipynb
  • sql_queries: 前処理用SQLを格納

  • pipeline_definition.py: パイプラインの定義を記述

  • pipeline_notebook.ipynb: パイプラインのコンパイルおよび実行用ノートブック

パイプラインの構成

以下がパイプライン定義です。

from google.cloud import aiplatform
from kfp import dsl
from kfp.dsl import component
import datetime
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.vertex_notification_email import VertexNotificationEmailOp


aiplatform.init(project=PROJECT_ID, location="asia-northeast1")

RECIPIENTS_LIST = [""] # NOTE: メールアドレスを3つまで登録できる
PROJECT_ID = ""

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()


@dsl.component(base_image='python:3.12', packages_to_install=['google-cloud-bigquery'])
def create_bigquery_op(dataset_name: str, location: str) -> str:
    """Creates a BigQuery dataset if it does not exist."""
    from google.cloud import bigquery
    client = bigquery.Client(project=PROJECT_ID)
    dataset_id = f"{PROJECT_ID}.{dataset_name}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = location
    dataset = client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {dataset_id} created.")
    return dataset_id


@dsl.pipeline(name="data-preprocessing-and-training-pipeline")
def my_pipeline():    
    # NOTE: 実行日を基にデータセット名を生成
    today = datetime.date.today().strftime('%Y%m%d')
    dest_dataset = f"pre_processed_dataset_{today}"
    create_dataset_op = create_bigquery_op(
        dataset_name=dest_dataset, location="asia-northeast1")

    label_sql = load_sql("sql_queries/add_reaction_label.sql")
    preprocess_sql = load_sql(
        "sql_queries/training_pre_processing.sql")
    undersampling_sql = load_sql("sql_queries/undersampling.sql")

    formatted_label_sql = label_sql.format(
        dataset=dest_dataset, table="add_reaction_label")
    formatted_preprocess_sql = preprocess_sql.format(
        dataset=dest_dataset, table="training_preprocessed")
    formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")

    notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

    with dsl.ExitHandler(notify_email_op):
        label_sql_op = BigqueryQueryJobOp(
            query=formatted_label_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(create_dataset_op)

        preprocess_sql_op = BigqueryQueryJobOp(
            query=formatted_preprocess_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(label_sql_op)

        undersampling_sql_op = BigqueryQueryJobOp(
            query=formatted_undersampling_sql,
            location="asia-northeast1",
            project="{{$.pipeline_google_cloud_project_id}}"
        ).after(preprocess_sql_op)

        dataset_create_op = TabularDatasetCreateOp(
            display_name=f"tabular_dataset_from_bigquery_{today}",
            bq_source=f"bq://{PROJECT_ID}.{dest_dataset}.summary_all_processed_undersampling",
            project="{{$.pipeline_google_cloud_project_id}}",
            location="asia-northeast1"
        ).after(undersampling_sql_op)

        model_training_op = AutoMLTabularTrainingJobRunOp(
            display_name=f"visitor_prediction_model_{today}",
            dataset=dataset_create_op.outputs["dataset"],
            target_column="reaction_score",
            training_fraction_split=0.8,
            validation_fraction_split=0.1,
            test_fraction_split=0.1,
            budget_milli_node_hours=72000,
            project="{{$.pipeline_google_cloud_project_id}}",
            optimization_prediction_type="classification",
            location="asia-northeast1"
        ).after(dataset_create_op)

データセット名にパイプライン実行日の日付を含める

パイプラインの実行ごとに、処理対象のデータを区別するためにデータセット名に実行日の日付を含めています。create_bigquery_opコンポーネントを使用して、BigQueryにデータセットを作成する際に実行日の情報を動的に生成し、それをデータセット名に反映します。

today = datetime.date.today().strftime('%Y%m%d')
dest_dataset = f"pre_processed_dataset_{today}"
create_dataset_op = create_bigquery_op(
    dataset_name=dest_dataset, location="asia-northeast1")

SQLの読み込みと動的変数の使用

パイプライン内で複数のSQLクエリを実行する際、同じパイプラインを他のデータセットやテーブル名で再利用できるように、SQL内のパラメータを動的に変更する仕組みを導入しました。

load_sql関数でSQLファイルを読み込んだ後、Pythonの.formatメソッドを用いて{dataset}や{table}などのプレースホルダに動的な値を埋め込んでいます。

def load_sql(file_path):
    with open(file_path, "r") as file:
        return file.read()

undersampling_sql = load_sql("sql_queries/undersampling.sql")

formatted_undersampling_sql = undersampling_sql.format(
        dataset=dest_dataset, table="summary_all_processed_undersampling")

以下が例として使用したundersampling.sqlです。

CREATE OR REPLACE TABLE `{dataset}.{table}` AS
WITH class_counts AS (
  SELECT reaction_score, COUNT(*) as count
  FROM `{dataset}.training_preprocessed`
  GROUP BY reaction_score
),
median_count AS (
  SELECT APPROX_QUANTILES(count, 2)[OFFSET(1)] as target_count
  FROM class_counts
)
SELECT data.*
FROM `{dataset}.training_preprocessed` data
JOIN class_counts
ON data.reaction_score = class_counts.reaction_score
JOIN median_count
ON TRUE
WHERE RAND() < (median_count.target_count / class_counts.count);

Google Cloud Pipeline Components

Google Cloud Pipeline Componentsは、Google Cloudのさまざまなサービスを簡単に利用できるように設計されたカスタムコンポーネントのセットです。パイプライン内で直接使用することで、複雑な処理を簡潔に記述できます。

以下が今回使用したコンポーネントです。

  • BigqueryQueryJobOp: BigQueryでSQLを実行する

  • TabularDatasetCreateOp: BigQueryのテーブルをVertex AIのデータセットとして登録する

  • AutoMLTabularTrainingJobRunOp: Vertex AIで表形式データのAutoMLトレーニングジョブを実行する

  • VertexNotificationEmailOp: 指定したメールアドレスに通知を送信する

コンポーネント間の依存関係の制御

パイプラインでは、各コンポーネントが依存する前段の処理が完了してから実行されるように.after()メソッドを使用して実行順序を制御します。

label_sql_op = BigqueryQueryJobOp(
    query=formatted_label_sql,
    location="asia-northeast1",
    project="{{$.pipeline_google_cloud_project_id}}"
).after(create_dataset_op)

パイプライン終了後にメール通知を送信

パイプラインの処理がすべて完了した際に通知を送るために、dsl.ExitHandlerを使用しています。以下のように、メール通知用のVertexNotificationEmailOpコンポーネントをExitHandler内で設定することで、パイプラインの終了時に自動的に通知が送信されます。

notify_email_op = VertexNotificationEmailOp(recipients=RECIPIENTS_LIST)

with dsl.ExitHandler(notify_email_op):
    label_sql_op = BigqueryQueryJobOp(
        query=formatted_label_sql,
        location="asia-northeast1",
        project="{{$.pipeline_google_cloud_project_id}}"
    ).after(create_dataset_op)
処理完了後にこのようなメールが届きます

パイプライン実行

Vertex AI Pipelinesを実行するには、パイプラインをコンパイルし、オンデマンドまたはスケジュールで実行します。今回はVertex AI Workbenchを実行環境として使用しています。

Vertex AI Workbench

オンデマンド実行

オンデマンド実行では、まずパイプラインの定義をYAML形式にコンパイルします。その後、コンパイルされたYAMLを使用してパイプラインを実行します。

以下のコードで、pipeline_definition.pyに記述したパイプライン定義をコンパイルし、compiled_pipeline.yamlというファイルに保存します。

from google.cloud import aiplatform
from kfp import compiler
import pipeline_definition

aiplatform.init(project=<project-id>, location="asia-northeast1")

compiler.Compiler().compile(
    pipeline_func=pipeline_definition.my_pipeline,
    package_path="compiled_pipeline.yaml"
)

コンパイルしたcompiled_pipeline.yamlを使用して、以下のコードでパイプラインを実行します。

aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
).submit()

実行が成功すると、以下のようにVertex AIのコンソールに結果が表示され、各ステップの進捗や状態を確認できます。

パイプライン

スケジュール実行

パイプラインを定期的に実行する場合は、PipelineJob.create_scheduleメソッドを使用してスケジュールを作成します。以下は、毎月1日の9:00(JST)に実行されるスケジュールを作成する例です。

pipeline_job = aiplatform.PipelineJob(
    display_name="data-preprocessing-and-training-pipeline",
    template_path="compiled_pipeline.yaml",
    parameter_values={},
    # enable_caching=False
)

pipeline_job_schedule = pipeline_job.create_schedule(
    display_name="monthly-data-preprocessing-and-training",
    cron="TZ=Asia/Tokyo 0 9 1 * *", # JSTで毎月1日の9:00
    max_concurrent_run_count=1,
    max_run_count=None
)

スケジュールが登録されると、Vertex AIのスケジュールタブで確認でき、以下のような画面が表示されます。

実行スケジュール

参考

採用情報


いいなと思ったら応援しよう!