
Apache Airflow初心者向けに箇条書きで簡単に説明する
特徴
Pythonで書かれているのでforループも自由だしpandasなどの外部モジュールも使えるしAWSなどパブリッククラウドとの連携も容易
定期実行のスケジューリングも単発実行も可能だからバッチジョブでもML用途でもレポーティングにもデータバックアップにも使える
DAGを使ってタスク間の順序を自在に構築できる
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperatorwith DAG(
"etl_sales_daily",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
task_a = DummyOperator(task_id="task_a")
task_b = DummyOperator(task_id="task_b")
task_c = DummyOperator(task_id="task_c")
task_d = DummyOperator(task_id="task_d") task_a >> [task_b, task_c]
task_c >> task_d

主要コンポーネント
オペレーターはロジックを関数で包んだもの
センサーはなにかの完了を待つ役割
フックは外部との連携
from airflow.models.baseoperator import BaseOperatorclass GCSCreateBucketOperator(BaseOperator):
def __init__(self, *, bucket_name: str, **kwargs):
super().__init__(**kwargs)
self.bucket_name = bucket_name def execute(self, context):
hook = GCSHook()
hook.create_bucket(self.bucket_name)
XComはDAG内のタスク間での通信に使われるキーバリュー形式のテーブルだが、容量は48KBしかいないのでデータ自体を格納するのではなくデータを保存した場所を格納するイメージ
デプロイについて
3つのコンポーネントがあってすべてairflowコマンドで操作できる
スケジューラーは司令塔でDAGファイルを解析してタスクのスケジューリングなどを行う
Webサーバーはパイプラインの状況をweb上で確認するためのインターフェースを提供すする
ワーカーは実際にタスクを実行するもの
別にエグゼキューターという概念があり、ワーカーを単体で動かすのか、分散して動かすのか、Kubernetesを使うのか、デバッグ用に動かうのか、といったことを決定する
Airflowの始めるには既に用意されたDockerイメージを使うとよい
特にKubernetes上で動かすならHelmチャートを利用するとよい
DAG配布をどうするか
スケジューラーとワーカーは両方ともDAGにアクセスできる必要があるので次の3つの方法のいずれかでAirflowデプロイ環境にDAGを配布しなければならない
共有ファイルサーバーを使うと楽だが読み取りに時間がかかる恐れがある
git-syncなどを使うとローカルリポジトリとリモートリポジトリを同期でいるので良い
他にもDockerイメージにDAGを焼き込む方法があるがDAG更新のたびにビルドし直しになる
いいなと思ったら応援しよう!
