見出し画像

Airflow 2.0 でDAG定義をよりシンプルに! TaskFlow APIの紹介

電通デジタルでバックエンド開発をしている松田です。

前回の記事は「広告出稿プランニング業務におけるセグメントのマッピングと表示改善」でした。

Dentsu Digital Tech Advent Calendar 2020 9 日目の記事になります。この記事ではAirflow 2.0で追加された機能の一つであるTaskFlow APIについて、PythonOperatorを例としたDAG定義を中心に1.10.xとの比較を交え紹介します。

弊社のAdvent Calendarでは、Airflow 2.0に関するものはこれまでにHAスケジューラの記事がありました。Airflow 2.0で提供される新しい機能について詳しく知りたい場合はAirflow Planningを参照ください。

TaskFlow APIとは?

TaskFlow APIとはざっくり言うと、タスク間の暗黙的なデータ連携を明示的に宣言できる機能です。

AIP-31: "TaskFlow API" for clearer/simpler DAG definitionやAstronomerさんのIntroducing Airflow 2.0も参照ください。

では1.10.xのDAG定義における課題は何か、どのようにその課題が解決されるかを具体的にみていきます。

Airflow 1.10.xのDAG定義における課題

Airflow 1.10.xではタスクによっては暗黙的な依存関係が発生し、依存関係を追いにくいという課題があります。

例えば下図で示すようにXComを使用する場合タスク間に暗黙的にデータの依存関係ができるので、あるタスクがデータをpushしたかどうか、そのデータを下流のタスクでpullしたかどうかについてそのタスクの処理内容を確認する必要があります。

画像1

以下にextract, transform, loadタスクの3つのタスクで構成される1.10.xスタイルでの単純なDAG定義サンプルを示します。

最初のextract以外の各タスクが前のタスクの出力を必要とするのでextract→transform→loadの順番で実行される必要があります。ただしそれぞれのタスクが上流のどのタスクの出力に依存しているかは明瞭でありません。それ故それぞれの関数の定義内にあるXComの値にアクセスするコードを確認する必要があります。

import json

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator

default_args = {
   'owner': 'airflow',
}

###########################
# 1.DAGの定義
###########################
with DAG(
   'task_flow_classic',
   default_args=default_args,
   schedule_interval=None,
   start_date=days_ago(2),
   tags=['example'],
) as dag:
   ###########################
   # 2.タスクの定義
   # タスクで呼び出す関数内で暗黙的なデータの依存関係が定義される
   ###########################
   def extract():
       data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
       order_data_dict = json.loads(data_string)

       return order_data_dict


   def transform(ti):
       order_data_dict = ti.xcom_pull(task_ids="extract")

       total_order_value = 0
       for value in order_data_dict.values():
           total_order_value += value

       ti.xcom_push(key="total_order_value", value=total_order_value)


   def load(ti):
       total_order_value = ti.xcom_pull(task_ids="transform", key="total_order_value")
       print("Total order value is: %.2f" % total_order_value)


   extract = PythonOperator(
       task_id="extract",
       python_callable=extract,
   )

   transform = PythonOperator(
       task_id="transform",
       python_callable=transform,
   )

   load = PythonOperator(
       task_id="load",
       python_callable=load,
   )
   
   ###########################
   # 3.タスクの依存関係定義
   # データの依存関係が不明瞭
   ###########################
   extract >> transform >> load

Airflow 2.0 の新しい記法によるDAGの定義

上記で示した1.10.xスタイルで定義したDAGをAirflow 2.0の新しい記法で書き換えてみましょう。

今回はAirflow 2.0をローカルで試すためにAirflow Breezeを使っています。

詳細な使い方はGitHubのAirflowレポジトリ にあるドキュメントを参照してください。​

Airflow 2.0で書き換えたDAG定義全体像は以下になります。1.10.xでの定義と比べPythonOperatorのインスタンス生成やXComの値へのアクセスなどの定型コードがなくなり、データの依存関係がタスク定義から分離されてDAG定義の見通しが大幅に向上しました。

import json

from airflow.utils.dates import days_ago
from airflow.decorators import dag, task

default_args = {
   'owner': 'airflow',
}

###########################
# 1.@dagでDAGの定義
###########################
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=["example"])
def task_flow():
   ###########################
   # 2.@taskでタスクの定義
   # multiple_outputs引数をTrueに設定することで辞書やリストなどの返り値を複数のXComの値にすることも可能
   ###########################
   @task()
   def extract():
       data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
       order_data_dict = json.loads(data_string)

       return order_data_dict

   @task(multiple_outputs=True)
   def transform(order_data_dict: dict):
       total_order_value = 0
       for value in order_data_dict.values():
           total_order_value += value

       return {"total_order_value": total_order_value}

   @task()
   def load(total_order_value: float):
       print("Total order value is: %.2f" % total_order_value)

   
   ###########################
   # 3.明示的なデータの依存関係の定義
   # この関係性からタスクの依存関係が自動的に推測される
   ###########################
   order_data = extract()
   order_summary = transform(order_data)
   load(order_summary["total_order_value"])



dag = task_flow()

今回課題として取り上げた依存関係を表すコード部分だけをAirflow 1.10.xと2.0で比較すると

1.10.x

   extract = PythonOperator(
       task_id="extract",
       python_callable=extract,
   )

   transform = PythonOperator(
       task_id="transform",
       python_callable=transform,
   )

   load = PythonOperator(
       task_id="load",
       python_callable=load,
   )

   extract >> transform >> load

2.0

   order_data = extract()
   order_summary = transform(order_data)
   load(order_summary["total_order_value"])

と、Airflow 2.0ではPythonの書式で宣言的に依存関係が記載でき、シンプルになりました。

おわりに

DAG定義周りで新しく追加された機能を具体例を示しながら紹介してみました。Airflow 2.0ではDAGの効率的な開発という観点でも大幅に改善され、その新しい記法によりDAGの定義をシンプルで明瞭にすることができるようになりました。Airflow 2.0がstableになったらこの記法を使っていきたいと思います。

10日目は「GoogleColabで統計的因果探索手法LiNGAMを動かしてみた」です。次回もよろしくお願いいたします。

参考

Introducing Airflow 2.0
Airflow documentation (developer ver.)
Airflow 2.0 Series (動画)