Databricksを活用したMLOps: Data + AI Summit 2024 現地参加で得た最新知見-参加報告②
三菱UFJフィナンシャル・グループ(以下MUFG)の戦略子会社であるJapan Digital Design(以下JDD)でMUFG AI Studio(以下M-AIS)に所属する、小林です。普段はMUFGに向けたAI施策のPoCやモデルの本番実装等を担当しています。
今回は、Databricks社が主催する年次最大イベント「Data + AI Summit 2024」に現地参加し、そこで得た知見をこのNoteにまとめていきます。本記事は全3回シリーズの第2回であり、主に「MLOps」に関連する個別セッションの内容と、個人的に参考になったポイントについてお伝えします。
「Data + AI Summit 2024」全体の概要や基調講演の詳細については、第1回の「Data + AI Summit 2024 現地参加で感じた活気とアップデートまとめ」をご覧ください。
MLOpsに関する公演を聞いて感じた全体の印象
MLOpsに関する様々な個別セッションに参加した結果、「いかにして大量モデルを自動で管理するか」がポイントだと感じました。以下に、このポイントを実現するために必要だと考える機能について述べます。
Databricks Asset Bundleを利用したCI/CD
Databricks Asset Bundles (DABs)は、ノートブック、クラスター、ジョブ、モデルレジストリの構成などをコードで管理し、Databricksワークスペースにデプロイする仕組みです。これにより、プロジェクトのリソースを一元管理し、自動化が可能となります。
大量モデルを複数ワーカーで並列学習する
複数のワーカーを使用して並列にモデルを学習させることで、時間を大幅に短縮できます。pandasUDFを活用することで、各ワーカーで独立した学習が可能になり、大規模なデータセットを効率的に処理できます。
これらの機能を活用することで、MLOpsの主要な課題である大量のモデル管理と効率的な学習を実現できます。公演を通じて、この分野での技術的進歩とその実践方法について深く理解することができました。
Databricks Asset Bundleを利用したCI/CD
Databricks Asset Bundleとは
Databricks Asset Bundles (DABs)は、ノートブック、クラスター、ジョブ、モデルレジストリなどの構成をコードで管理し、Databricksワークスペースにデプロイするための仕組みです。DABsを利用することで、これらの構成をGitHubなどのバージョン管理ツールで管理することができます。
主な機能と利点
リソースの一元管理と自動化:
YAMLファイルを使用してプロジェクトのアーティファクト、リソース、および構成を定義します。これにより、プロジェクト全体をコードとして表現し、ソースコントロールやコードレビュー、テスト、CI/CD(継続的インテグレーションとデリバリー)が容易になります。
CI/CDの統合:
DABsは既存のCI/CDシステム(GitHub Actions、Jenkins、Azure DevOpsなど)と連携して、テストとデプロイメントを自動化します。これにより、リリースサイクルが短縮され、エラーの減少を期待できます。
テンプレートの利用とカスタマイズ:
デフォルトのバンドルテンプレートを使用するか、カスタムテンプレートを作成して組織のベストプラクティスを実装できます。これにより、新しいプロジェクトの立ち上げが迅速かつ効率的になります。
複数ワークスペースでのデプロイ:
DABsを使用すると、複数のワークスペースにわたってプロジェクトを自動的にデプロイできます。これにより、変更管理とガバナンスが一貫して行われるようになります。
個別セッションでは、以下のようにDatabricks Asset Bundles(DABs)を利用したCI/CDの事例が紹介されていました。
Databricks Asset Bundles: A Unifying Tool for Deployment on Databricks
Databricks Asset Bundles(DABs)は、Databricks上でのコードとリソースのデプロイメントを簡素化し、統一するためのツールです。本公演では、Databricksのソフトウェアエンジニアと、84.51゜のクラウドエンジニアが、DABsの概要、実装方法、および実際の使用例について詳しく説明しました。
DABsの目的と重要性
DABsは、信頼性の高いソフトウェア開発のためのベストプラクティスを取り入れて、デプロイメントプロセスを統一し、自動化することを目的としています。これにより、エラーや遅延のリスクを軽減し、効率的な開発が可能になります。
ソフトウェア開発ライフサイクルのベストプラクティス
DABsは、以下のような標準的なソフトウェア開発ライフサイクルのプラクティスをサポートします:
コードレビュー: 変更はチームメンバーによってレビューされます。
自動テスト: 単体テストとエンドツーエンドの統合テストが行われます。
自動デプロイメント: コード変更が承認されると自動的にデプロイメントが実行されます。
環境分離: 開発、ステージング、本番環境が分離されます。
Databricks Asset Bundlesの構成要素
宣言的な設定フォーマット: YAML形式でリソース設定、ワークスペース設定、環境設定を一元管理します。
CLIによる操作: 設定のバリデーション、デプロイメント、リソースの操作をコマンドラインで実行できます。
環境ターゲットの設定: デプロイメントターゲットごとにリソース設定をカスタマイズできます。
具体的な使用例
DABsを使用することで、以下のような効果が得られます:
設定管理の簡素化: 複数のジョブやリソースを単一のYAMLファイルで管理し、環境ごとのカスタマイズが可能です。
環境間の差異管理: インスタンスプールなどのリソースIDを自動的にルックアップし、環境ごとの設定を容易に管理します。
デプロイメントの隔離: 各開発者が独立した環境で作業できるため、相互干渉を避け、効率的に開発が進められます。
実際のymlコードのサンプルおよび実行方法について以下に掲載します。詳細に関しては公式ドキュメントご参照ください。
また、実際にDABsを利用している84.51゜社の構成についても紹介されていました。内容を見るところ、コミットをトリガーにGitHub ActionsからDABsを実行し、自動でモデル構築、スコアリング、精度検証を行っているようです。
MLOps: Developing and Deploying ML Models with Databricks
Plenitude社による機械学習モデルのプロダクション化に向けて構築した包括的フレームワークについての説明セッションでした。フレームワークを構築するに至った課題、使用した主要なツール、そしてCI/CDプロセスの詳細について述べられています。
課題
Plenitude社では会社の成長過程でいくつかの課題に直面しました。
メンバーの多様化
異なるスキル、背景、経験を持つ人々同士で協力し機械学習モデルを運用する必要があり、より効果的な方法を見つけることが必要でした。運用工数の課題
初期の段階では多くの手作業が必要で、特にデータの取り込みに多くの調整が求められました。スケーラビリティの課題
ビジネスが高度な分析の価値を認識すると、モデルの需要が急増し、スケールアップし迅速に市場に投入する必要がありました。実験の課題
各データサイエンティストが独自のノートブックでテストを行っていましたが、実験管理が不十分でした。実験とパラメータを追跡するための確固たるアプローチが必要でした。
採用したソリューション
これらの課題に対処するため、Plenitude社はMLOpsを採用しました。プロセスの標準化、自動化の促進、継続的な開発を通じて、データサイエンティストが運用タスクに時間を取られることなくビジネス価値の創出に集中できるようにしました。
使用した主要なツール
Unity Catalog
Databricksが提供するデータ管理ツールで、データのアクセス制御を統一的に管理します。これにより、データサイエンティストやエンジニアがデータに安全かつ効率的にアクセスできるようになります。Databricks Asset Bundles
YAMLファイルにアーティファクトやリソースを指定し、Databricksのコマンドラインインターフェースを使用してデプロイ、検証、実行します。バンドルはスクラッチから作成することも、テンプレートを使用して作成することもでき、特にデプロイメントフェーズで役立ちます。これにより、コラボレーションが改善され、コンプライアンスが確保され、機械学習モデル運用のユースケース標準化が行われました。
CI/CDパイプライン
DevOpsで開発され、TDDアプローチを採用していました。
CIパイプライン
プルリクエストが作成されると、テストがトリガーされ、CIパイプラインが開始します。静的コード解析、ユニットテスト、統合テストを実行します。これにより、コードの品質とセキュリティが確保されます。CDパイプライン
各環境(サンドボックス、ステージング、プロダクション)でプロダクションデータにアクセスできます。コードとモデルをデプロイし、ステージング環境で最終トレーニングとテストを実行してから、プロダクションで安定したバージョンをデプロイします。新しいモデルのパフォーマンスを既存のものと比較し、優れている場合は新しいチャンピオンとしてプロダクションにデプロイします。
また、本セッションでは実施にファイルを更新した際のCI/CDについてのデモが公開されておりました。
CI/CDのデモ
1.コードの変更とコミット
READMRファイルを開き、テスト用のコードを1行書き込みます。変更をコミットし、メインブランチにマージします。これにより、PRパイプラインが開始します。
2.テストの実行
PRをトリガーにユニットテストが実行しテスト結果をキャッシュします。
3.アーティファクトの公開と脆弱性のスキャン
テストが完了すると、結果を追跡し、アーティファクトを公開します。BanditやVeracodeで脆弱性をスキャンし、サイバーセキュリティチームとレポートを共有します。
4.CDパイプラインの実行
Databricks CLIのインストールを検証し、認証情報を設定してバンドルをデプロイします。トレーニングワークフローを実行し、実行を監視し、モデルをカタログに登録し、MLflowで全モデルを追跡します。また、サンドボックス環境からステージング環境、プロダクション環境まで、各環境でプロダクションデータにアクセスできます。
5.モデルの評価とデプロイ
ターゲット作成とモデルトレーニングのタスクを並行して実行します。新しいモデルをチャレンジャーとして検証し、チャンピオンモデルとパフォーマンスを比較します。チャレンジャーが優れている場合、それが新しいチャンピオンモデルとしてデプロイされます。
以上がDABsを利用したCI/CDの内容でした。コード変更をトリガーにコードのテストだけでなくモデルも作成し、作成したモデルのパフォーマンスを測り問題がないかまで自動で確認できるのが素晴らしい内容だと感じました。これを利用することで機械学習プロダクトのテストからデプロイまで自動化でき、運用時に余計なコストを掛けずにすむと感じました。
大量モデルを複数ワーカーで並列学習する
本件については、以下の個別セッションでpandasUDFを活用した実装方法が紹介されました。ここでは、実際のコードを基にpandasUDFを活用した実装方法を説明します。
Scaling MLOps to Retrain 50k Weekly Models in Parallel Using UDFs.
個別セッションで紹介された内容では、週に50,000のモデルを訓練・再学習する必要があり、pandasUDFを利用することで大量のモデルの学習を効率的に行っています。pandasUDFを使用することで、複数のワーカーで並列にモデルを構築できるため、迅速かつ簡易に大量のモデルを生成できることが本発表のポイントです。
また、ここで紹介されるpandasUDFのユースケースとしては、1つのPySpark DataFrameの中に「複数セグメントのデータが存在しており、セグメント毎にモデルを構築する」といった内容です。例としては「株式の銘柄コード単位でモデルを構築する」や「店舗毎の売上予測モデルを構築する」等が考えられます。
ただ、pandasUDFは非常に柔軟性に富んでいるため、利用したいユースケースに合わせてデータセットを用意すれば、さまざまな使い方ができます。(例えば、5-foldクロスバリデーションでモデルを作成する際に、複数ワーカーで並列に学習する等)
それでは、個別セッションの内容を踏まえ、実際にpandasUDFを利用したモデル学習コードを記載します。
前提
使用するMLクラスター:15.2ML
クラスターサイズ:r5d.large
ワーカー数:5
オートスケール:なし
回帰問題に対してLightGBMを利用
まず、学習に利用するダミーデータを用意します。ここでは10個のプロダクトグループを想定しており、各プロダクト単位でモデルを構築します。
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
# データの行数
num_records = 10000000
# ダミーデータの生成
data = {
'record_id': np.arange(1, num_records + 1),
'product_id': np.random.choice(['product_1', 'product_2', 'product_3', 'product_4', 'product_5',
'product_6', 'product_7', 'product_8', 'product_9', 'product_10'], num_records),
'numeric_feature_1': np.random.rand(num_records),
'numeric_feature_2': np.random.rand(num_records),
'label': np.random.rand(num_records) * 100 # 実際のデータでは、これは売上高やコンバージョンなどになる
}
# 特徴量セット
features = ["numeric_feature_1","numeric_feature_2"]
# DataFrameの作成
df = pd.DataFrame(data)
# データの確認
print(df.head())
#sparkDataFrameの作成
df = spark.createDataFrame(df)
df.printSchema()
次に、学習用pandasUDFを定義します。ここでのポイントは以下の通りです。
pandasUDF適用時はapplyInPandas関数を利用する。
学習時に使用するmlflow.run_idは複数モデルで同一にするため、pandasUDF宣言側で生成し、pandasUDF内ではデータフレームに定義してあるrun_idを利用する。
複数ワーカーで学習していることを確認するため、MLflowのアーティファクトにproduct_id_ホスト名.txtを生成し、実行時間を記録する。
複数ワーカーで学習しモデルをMLflowへ書き込むため、レートリミットを防ぐためにランダムにtime.sleep処理を入れる。
applyInPandasの概要
目的: Pandas UDFを用いて、グループ化されたデータに対して関数を適用し、結果をSpark DataFrameとして返す。
引数
関数: グループごとに適用する関数。入力としてPandas DataFrameを受け取り、Pandas DataFrameを返す。
スキーマ: 関数の戻り値のスキーマを指定する。これはSpark DataFrameとして結果を返すために必要。
import lightgbm as lgb
import mlflow
import random
import time
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import socket
from datetime import datetime, timedelta, timezone
def now():
return datetime.fromtimestamp(time.time(), tz=timezone.utc).astimezone(
timezone(timedelta(hours=9))
)
def fit_udf(training_pdf):
# 学習対象grpの取得
product_id = training_pdf["product_id"].values[0]
# recordサイズ取得
training_sample_size = training_pdf.shape[0]
# 複数ワーカーでの処理を確認するため処理実行時間&workerのhostを取得
host_name = socket.gethostname()
start_time = now()
# run_name
run_name = f"{product_id}_{host_name}"
X = training_pdf[features]
y = training_pdf["label"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
train_data = lgb.Dataset(X_train, label=y_train)
params = {"objective": "regression", "metric": "rmse"}
model = lgb.train(
params,
train_data,
)
# Evaluate model
predictions = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, predictions))
# Log to MLFlow
# run_idの取得
run_id = training_pdf["run_id"].values[0]
# experiment_idの取得
experiment_id = training_pdf["experiment_id"].values[0]
run_params = {"run_id": run_id, "experiment_id": experiment_id}
with mlflow.start_run(**run_params, nested=True) as run:
# MLflow rate limit対策
random.seed(abs(hash(product_id)) % (10**4))
random_wait = random.random()
time.sleep(random_wait * 5) # Wait up to 5 seconds
mlflow.lightgbm.log_model(
model, artifact_path=f"model_{product_id}"
)
artifact_uri = f"runs:/{run_id}/model_{product_id}"
# 学習を行ったワーカーをMLflowへ保存
end_time = now()
message = f"start time : {start_time} \n end time : {end_time} "
mlflow.log_text(message, f"{run_name}.txt")
return_df = pd.DataFrame(
[[product_id, training_sample_size, artifact_uri, rmse]],
columns=["product_id", "training_sample_size", "model_path", "rmse"],
)
return return_df
次に、実際に学習する際の実行コードを定義します。
まず初めに、pandasUDFの返り値用にSpark DataFrameを定義します。
from pyspark.sql.types import (
StructType,
StructField,
IntegerType,
StringType,
FloatType,
)
trained_models_info_schema = StructType(
[
StructField("product_id", StringType()),
StructField("training_sample_size", IntegerType()),
StructField("model_path", StringType()),
StructField("rmse", FloatType()),
]
)
次に、実際に学習を行います。pandasUDF呼び出し前にMLflowのrun_idを取得し、その値をpandasUDFへ渡すことがポイントです。
with mlflow.start_run() as run:
# sparkの遅延処理特性により、ここでモデルは生成されない
model_paths_df = (
df
.withColumn('run_id', F.lit(run.info.run_id))
.withColumn('experiment_id', F.lit(run.info.experiment_id))
.groupBy('product_id')
.applyInPandas(fit_udf, schema=trained_models_info_schema)
)
# ここでモデル生成
model_paths_df.display()
以下に学習結果のMLflowのアーティファクトを示します。ご覧のとおり、1つのrun_idに複数のモデルが登録されており、さらにテキストファイル名から複数のワーカーで学習されていることが分かります(ホスト名が異なる)。
予測処理について
次に、予測処理について説明します。予測処理に利用するSpark DataFrameを生成し、モデルパスを定義したSpark DataFrameと結合させ、予測用のpandasUDFで予測処理を実施する流れとなります。
初めに、テスト用のダミーデータを用意します。
# データの行数
num_records = 1000
# ダミーデータの生成
data = {
'record_id': np.arange(1, num_records + 1),
'product_id': np.random.choice(['product_1', 'product_2', 'product_3', 'product_4', 'product_5',
'product_6', 'product_7', 'product_8', 'product_9', 'product_10'], num_records),
'numeric_feature_1': np.random.rand(num_records),
'numeric_feature_2': np.random.rand(num_records),
}
# 特徴量セット
features = ["numeric_feature_1","numeric_feature_2"]
# DataFrameの作成
df_test = pd.DataFrame(data)
# データの確認
print(df_test.head())
#sparkDataFrameの作成
df_test = spark.createDataFrame(df_test)
df_test.printSchema()
続いて、モデルのアーティファクトパスを取得し、product_id毎に保存されているモデルパスを取得し、Spark DataFrameを生成します。
なお、パスの取り方についてですが、モデル保存時の命名ルールとしてf"model_{product_id}"としているので、それを前提としてパスを取得している点にご注意ください。
import mlflow
# モデル生成時のrun_idを設定
run_id = run.info.run_id
# アーティファクトのリストを取得
artifact_list = mlflow.client.MlflowClient().list_artifacts(run_id)
artifact_paths = []
product_ids = []
# 各アーティファクトのパスを表示
for artifact in artifact_list:
if "model" in artifact.path:
artifact_path = f"runs:/{run_id}/{artifact.path}"
artifact_paths.append(artifact_path)
product_ids.append(artifact_path.split("/")[-1].replace("model_", ""))
pathDF = spark.createDataFrame(
pd.DataFrame(
{
"product_id": product_ids,
"model_path": artifact_paths,
}
)
)
pathDF.display()
# ダミーテストデータとの結合
df_test = df_test.join(pathDF,"product_id","inner")
df_test.display()
予測処理用のpandasUDFの定義
次に、予測処理用のpandasUDFを定義します。ポイントは以下の通りです。
MLflowのレートリミットに抵触しないようにランダムにsleepを入れること。
大量のモデルロードや予測をワーカーで行うことでワーカーメモリが枯渇する可能性があるため、推論が終わった後、モデルオブジェクトを削除することでこのメモリエラーを回避します。
import os
def predict_udf(features_w_models_pdf):
# グループとモデルパスをデコード
this_group = features_w_models_pdf['product_id'].values[0]
this_model = features_w_models_pdf['model_path'].values[0]
# リセットされたランダムシードでジッターを追加
random.seed(abs(hash(this_group)) % (10 ** 4))
rand_wait = random.random()
time.sleep(rand_wait * 5) # 最大5秒のランダムな待機時間を追加
# モデルアーティファクトのための/tmpディレクトリを設定
os.system(f'mkdir -p /tmp/{this_group}/')
# モデルをロード
this_model = mlflow.lightgbm.load_model(
this_model,
dst_path=f"/tmp/{this_group}/"
)
features_w_models_pdf['prediction'] = this_model.predict(features_w_models_pdf[features])
# 推論が終わったらモデルアーティファクトを削除
os.system(f'rm -r /tmp/{this_group}/')
return features_w_models_pdf
最後に予測処理を実行します。返り値としてはtest用のDataFrameのカラム+予測値になるため、返り値用のpysparkDataFrameを定義します。
推論処理自体もpandasUDFを適用することでproduct_id毎に予測が行われます。
from pyspark.sql import types as T
# 戻り値のスキーマを定義
df_json = df_test.select('*').schema.jsonValue()
preds_schema = (
T.StructType()
.fromJson(df_json)
.add(T.StructField('prediction', T.FloatType()))
)
# 推論を適用
df_w_preds = (
df_test
.groupBy('product_id')
.applyInPandas(predict_udf, preds_schema)
)
# 推論実施
df_w_preds.display()
以上が、pandasUDFを利用した複数ワーカーでの学習と推論についての説明でした。個人的には、pandasUDF内に普段記載している学習コードや予測コードを記載するだけで簡単に並列処理ができる点が非常に使いやすいと思いました。また、今回は単純なコードを記載しましたが、書き方によっては5-foldクロスバリデーションを並列で行ったり、ハイパーパラメータチューニングを行うなど、カスタマイズが簡単にできる点も非常に良いと感じました。ぜひ、自社プロダクトでも活用したいと思います。
Japan Digital Design株式会社では、一緒に働いてくださる仲間を募集中です。カジュアル面談も実施しておりますので下記リンク先からお気軽にお問合せください。
この記事に関するお問い合わせはこちらにお願いします。