見出し画像

Airflowのタスク実行環境を分離する

電通デジタルでバックエンド開発をしている松田です。弊社ではデータパイプラインの構築や管理のために主にApache Airflowを利用しています[1, 2]。
本記事では、AirflowのOperatorを使ってタスク実行環境を分離する方法についてご紹介します。

タスク実行環境を分離するモチベーション

はじめになぜAirflowにおけるタスク実行環境を分離したいのかについてですが、これには主に3つ(他の観点もあるかもしれませんが)挙げられると思います。

1つ目はPythonライブラリの競合回避やライブラリの特定のバージョンを使いたい場合です。Airflowのデータパイプライン構築において特定のライブラリを使いたいときに、Dockerを使ったbuildであればDockerfileにライブラリを追記するか、GCPのCloud ComposerであればPYPI PACKAGESからライブラリを追加することが可能です[3]。しかしAirflowがPythonで書かれているためにAirflow自体が様々なPythonライブラリを使用していることや、依存ライブラリが各タスクを実行するすべてのworkerにインストールされる仕様から使用するライブラリが競合する可能性が高い問題があります[4, 5, 6]。

2つ目は特定のタスクに対して別途計算リソースを用意したい場合です。Amazon Elastic Container Service (Amazon ECS)といったコンテナ管理サービスを利用して外部リソースでタスクを実行するようにすれば、Airflowから完全に分離された計算リソースを使ったタスク実行が可能になります。

3つ目はタスクの再利用性やタスクのテストの容易性を高めたい場合です。Airflowではワークフロー定義とタスクで使用するPython関数を同じファイルで記述することが可能ですが、それだとそのタスクの再利用やテストはやや困難になります。ワークフロー定義とそれを構成するタスク実装との結合を疎にすることでタスクの再利用性が高まり、またテストもより容易になります。

以下ではこれらの観点に沿ってAirflow Operatorを用いた分離方法を紹介していきます。

分離方法

タスク実行環境を分離させる方法はいくつか考えられますが、その中でも用意されているAirflow Operatorで簡易に実現するには、PythonVirtualenvOperator、ECSOperator、KubernetesPodOperator、DockerOperatorを使った方法が挙げられます。

1. PythonVirtualenvOperatorを使う
PythonVirtualenvOperatorはvirtualenvによる仮想環境上で関数を実行するOperatorです。Pythonライブラリ競合回避や特定のライブラリバージョンの使用のみが目的であればこのOperatorを使用することが考えられます。このOperatorのrequirements引数で使いたいライブラリとそのバージョンを、python_callable引数でそのライブラリを使用した関数をそれぞれ指定することが可能です。ここで指定したライブラリを使用するためには、呼び出す関数の中でimportする必要があります。

以下のサンプルコードではstatsmodelsライブラリを使ったsample_funcという関数を定義してPythonVirtualenvOperatorタスクで実行しています。
※ 以降サンプルコードのAirflowバージョンは1.10.2または1.10.3です。

・呼び出す関数

def sample_func():
   import statsmodels.api as sm

   y = [1, 3, 4, 5, 2, 3, 4]
   x = range(1, 8)

   model = sm.OLS(y, sm.add_constant(x))
   results = model.fit()

   return results.params


・Airflowタスク

sample_task = PythonVirtualenvOperator(
   task_id="sample_task",
   python_callable=sample_func,
   requirements=["statsmodels"],
   python_version=sys.version_info[0]
)

PythonVirtualenvOperatorで完結するのでお手軽です。ただ、DAG実行時関数内にエラーがあるとsubprocessのエラーとして出てきたりするのでDAG実行でのデバッグは少し困難があることや、入らないライブラリがあったりします[1]。

2. ECSOperator (or KubernetesPodOperator)を使う
※ 私の所属の環境がAWSなので主にECSOperatorについての話になります。
ECSOperatorやKubernetesPodOperatorはコンテナ化されたタスクをAmazon ECSもしくはKubernetes Pod上で実行します。コンテナベースのタスク実装が前提となるのでワークフロー定義からタスク実装が分離され、またAmazon ECSであれば実行環境がAirflow workerからECSタスク定義で設定した計算リソースを持つECSクラスターへ分離されるので上述の分離モチベーション全てを満たすことができます。
ECSOperatorのtask_definition引数にはECSで作成したタスク定義名、cluster引数にはECS cluster名、launch_typeにはFARGATEもしくはEC2が指定できます。引数詳細はAirflowかboto3のドキュメントを参照してください。
※ ECSOperatorではboto3を使用するので環境によっては予めインストールする必要があります。

以下のECSOperatorを用いたサンプルコードでは、作成しておいたsample-def-mtdというECSタスク定義を呼び出してECSでタスクを実行します。

・Airflowタスク

sample_task = ECSOperator(
   task_id="sample_task",
   task_definition="sample-def-mtd:2",
   cluster="airflow-task-proc",
   overrides={
       "containerOverrides": [
           {
               "name": "sample-con",
               "cpu": 256,
               "memory": 1024
           }
       ]
   },
   aws_conn_id="aws_conn",
   launch_type="FARGATE",
   network_configuration={
       "awsvpcConfiguration": {
           "subnets": ["subnet-xxxxxxxxxx"],
           "securityGroups": ["sg-xxxxxxx"]
       }
   }
)


ECSタスク定義とAmazon ECRにpushしたDockerイメージのタスク実装サンプルは下記になります。

・ECSタスク定義: sample-def-mtd

{
 "executionRoleArn": "arn:aws:iam::xxxxxx:role/ecsTaskExecutionRole",
 "containerDefinitions": [
   {
     "logConfiguration": {
       "logDriver": "awslogs",
       "options": {
         "awslogs-group": "/ecs/sample-def-mtd",
         "awslogs-region": "ap-northeast-1",
         "awslogs-stream-prefix": "ecs"
       }
     },
     "portMappings": [],
     "environment": [
       {
         "name": "S3_BUCKET_NAME",
         "value": "sample-task-output"
       }
     ],
     "image": "xxxxxx.dkr.ecr.ap-northeast-1.amazonaws.com/hoge:123abc",
     "name": "sample-con"
   }
 ],
 "placementConstraints": [],
 "taskRoleArn": "arn:aws:iam::xxxxxx:role/sample-mtd",
 "family": "sample-def-mtd",
 "requiresCompatibilities": [
   "FARGATE"
 ],
 "networkMode": "awsvpc",
 "cpu": "256",
 "memory": "1024"
}


・JSONをS3にアップするだけの実行タスクサンプル

import json
import os
import boto3

sample_json = {"key": [1, 2, 3, 4, 3, 4]}

s3 = boto3.client(
   service_name="s3",
   region_name="ap-northeast-1"
)

s3.put_object(
   Body=json.dumps(sample_json),
   Bucket=os.environ["S3_BUCKET_NAME"],
   Key="sample_dag1_task1.json"
)

このOperatorを使うにはDockerイメージの作成、Dockerコンテナレジストリへのpush、ECSタスク定義の設定などがあり少し煩雑になります。またECSOperatorは、Airflow 1.10.5未満のバージョンではECSのログを確認するのにCloudWatchなどを見る必要がありますが、1.10.5以降ではECSOperatorのawslogsの引数を使用することでAirflow UI上でログの確認ができるようになっていました[7]。またKubernetesPodOperatorではXCom周りの引数が用意されているのですがECSOperatorは現在のところないのでタスクに入出力がある場合はS3などへ入出力するといった実装が必要になります。

3. DockerOperatorを使う
DockerOperatorは、指定したDockerイメージから作成されるDockerコンテナに実行環境が分離されます。Pythonライブラリの競合を回避でき、かつPythonライブラリが入らない問題に直面しませんがECSOperatorなどで実行したほうが計算リソースの観点から安全なので個人的にはDockerOperatorの使用状況は限られるのではないかと思います。
※ dockerライブラリを使用するので環境によっては予めインストールする必要があります。


まとめ

本記事ではAirflowにおけるタスク実行環境の分離方法について簡単に紹介しました。これまでAirflowを使ってきた中で、Pythonライブラリの競合や計算リソースの配慮などを考えるとPythonOperatorで独自に定義した関数を実行するタスクは避けて、タスク実行環境が分離される方法で実装するのが良いのではないかと感じました。
上記挙げたOperatorの他にも機械学習特有のタスクであればAWSのSageMaker系Operatorの使用など、そのタスク内容によってはより適した外部リソースをキックするOperatorも候補に挙げられると思います。
Airflowはバグ修正や改善、新規機能追加が現在も活発に行われておりさらに使いやすくなっていくのではと思います。

参考

[1] https://www.slideshare.net/potix2_jp/airflow-224004058?ref=https://shinjuku-geek-lounge.connpass.com/event/160725/presentation/
[2] https://speakerdeck.com/matsudan/pycon-jp-2019-xin-mi-pythonistagazeng-ruairflowru-men-and-huo-yong-shi-li-shao-jie
[3] https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies
[4] https://github.com/apache/airflow
[5] https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
[6] https://cloud.google.com/composer/docs/how-to/using/testing-dags
[7] https://github.com/apache/airflow/releases/tag/1.10.5