DatabricksのDelta Live Tables活用事例 - 大規模データの増分更新の実装
三菱UFJフィナンシャル・グループの戦略子会社であるJapan Digital Design(以下JDD)でデータエンジニアリングを担当している佐川です。
弊社のデータ基盤はDatabricksを利用しています。データエンジニアリングでパイプラインを構築する際に利用しているDelta Live Table(DLT)について、使い方の事例を共有させていただきます。少しでもご参考になれば幸いです。
DLTの概要
まず、簡単にDLTについて説明します。DLTは、ETL開発とデータマネジメントを容易にする機能です。Databricksローンチ時の発表では以下のメリットがあげられています:
データ依存性に対する理解
パイプラインに対する可視化
データをコードとして取り扱う
これらの機能は、データパイプライン管理ツールのdbtと類似した特徴を持っています。弊社ではセキュリティの制約もあり、dbtの利用は難しいため、代わりにDLTを中心にパイプラインを構築しています。
DLTの主要なメリット
DLTの主要な技術的メリットは、簡単に高速な増分アップデートを実現できる点です。データ量が多い場合、フルリフレッシュではなく増分アップデートを実装することで、処理時間とコストを大幅に削減できます。
実装における課題
しかし、シンプルなユースケースを外れると、実装に工夫が必要になります。最近直面した課題として、「データが多すぎるので、ログのディレクトリから特定の日(月末)だけピックアップして増分アップデートしたい」というケースがありました。
最初のアプローチ
まず試みたのは、月末のパスをリスト化して読み込む方法です:
# list_pathesを月末の日付をlistで返す関数の結果と仮定します
list_pathes = ["/log/20240930/file.csv", "/log/20241031/file.csv", "/log/20241130/file.csv"]
@dlt.table
def log_table():
df = None
for path in list_pathes:
temp_df = spark.read.csv(path)
if df is None:
df = temp_df
else:
df = df.unionAll(temp_df)
return df
この方法ではフルリフレッシュとなり、データ量と共に処理時間が線形的に増加してしまいました。
第二のアプローチ
次に、ストリーミング読み込みでのフィルタリングを試みました:
@dlt.table
def log_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/log")
.select(
"*",
col("_metadata.file_path").alias("file_path")
)
.withColumn("receipt_date", to_date(regexp_extract(col("file_path"), r"/(\d{8})/", 1)))
.filter(col("receipt_date") === last_day(col("receipt_date")))
)
Delta Tableにはファイルメタデータ列という予約カラムが存在します。このメタデータ列を使用してフィルタリングを試みました。が、この方法でも増分更新は機能しませんでした。
メタデータ操作とファイルパスに基づくフィルタリングが、Sparkの増分処理の最適化を妨げているようです。
Sparkは新規データの特定が困難になり、結果としてフルスキャンを実行することになってしまいました。
最終的な解決策
最終的に、以下のような2段階のアプローチを採用しました。
1.前処理タスク:
def file_exists(path):
try:
dbutils.fs.ls(path)
return True
except Exception:
return False
# list_pathesを月末の日付をlistで返す関数の結果と仮定します
for path in list_pathes:
dst_path = "/dbfs/persist/{}".format(path)
if file_exists(dst_path):
print(f"File already exists: {dst_path}")
else:
dbutils.fs.cp(src_path, dst_path)
2.DLTパイプライン:
@dlt.table
def raw_log_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/dbfs/persist/")
.select("*")
)
@dlt.table
def final_log_table():
return spark.readStream.table("LIVE.raw_log_table")
# その他のETL処理
この方法により、初期ロードは1時間程度、その後の更新は20分以下で完了するようになりました。
総括
実は、当初、第2のアプローチでうまく動くと勘違いしていて、それ中心に記事をまとめていました。
ところが、本稿を入稿しようとした寸前で、念の為、再実行したところ、増分になっていませんでした。
なぜ勘違いしたのかというと、
・元データはリアルタイムではなく、日別に書き出される。
・DLTジョブを実行して、元データが更新されていない同日中は高速な増分アップデートとなる。
・再実行したら、高速に動いたので、問題ないと勘違い。
・元データが更新されると、フルリフレッシュとなる。
という挙動でした。データエンジニアリングあるあるですが、ロジックで考えた結果と、実際に大規模なデータを流してみたときの結果がだいぶ異なる場合があります。
データ規模が大きい場合の検証はどうしても時間がかかるため、詳細な検証をやりすぎると納期に間に合わないので、どうすればいいかは永遠のテーマです。
以下、まとめです。
DLTで増分アップデートを実現するには、ストリーミングテーブルが前提となります。
入力のDataFrameに対する複雑なフィルタリングは、増分処理の最適化を妨げる可能性があります。
DLT内での複雑なデータ加工は避け、前処理でデータを整理してからシンプルなパイプラインを構築する方が効果的です。
大規模データでの検証は時間がかかるため、早期の実データでの動作確認が重要です。
この事例が、同様の課題に直面している方々の参考になれば幸いです。
Japan Digital Design株式会社では、一緒に働いてくださる仲間を募集中です。カジュアル面談も実施しておりますので下記リンク先からお気軽にお問合せください。
この記事に関するお問い合わせはこちら
Japan Digital Design 株式会社
Technology & Development Div.
Shinichi Sagawa