Celeryでタスク実行されるまでの流れ #422
CeleryはPythonの分散タスクキュー(もしくは非同期タスクキュー)として使用される技術です。分散タスクキューでは、メッセージブローカーを介してタスク(作業の単位)を送受信し、複数のワーカープロセスがこれらのタスクを非同期に実行します。
Celeryの主な用途:
●非同期処理
●分散・並列処理
●スケジューリング
●耐障害性と拡張性
Djangoとも簡単に連携でき、バックグラウンドで計算処理などを実行する時に便利です。
私が所属するチームではアプリケーション本体が起動するAPI用コンテナと、バッチ処理専用のCelery用コンテナが別々で走っていて、これがどのように連携しているのかきちんと理解できていなかったので整理しました。
アプリケーション本体からタスクを送る
ざっくり以下の流れです。
タスク化したい処理を関数にして「@app.task」デコレーターをつける
関数を「.delay()」や「.apply_async()」で実行する
タスクをシリアライゼーションしてメッセージブローカーに送る
Celeryワーカーがメッセージブローカーをポーリングし、タスクがあれば処理する
ChatGPT先生によるサンプルコードです。
タスク化したい処理に「@app.task」をつけて、
# tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest:guest@localhost//')
@app.task
def add(x, y):
return x + y
delayやapply_asyncで実行します。
# another_module.py
from tasks import add
result = add.delay(4, 4)
メッセージブローカーはcelery.pyやDjangoのsettings.pyで設定できます。
# celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
_redis = "redis://{host}:{port}/{db}".format(**REDIS_CONFIGS['your_config'])
app = Celery(
'myproject',
broker=_redis,
backend='rpc://', include=['myproject.tasks']
)
タスクが送信されるのは理解できましたが、Celeryがどうやって関数の内容(タスクの処理内容)を把握しているのかが分かりませんでした。
これもChatGPT先生に聞いたところ、以下のように教えてくれました。
まぁつまり必要な情報は良い感じにして渡してくれているということですね。
タスクをスケジューリングする
Celeryはタスクのスケジューリングが可能です。私のチームでは日次バッチをスケジューリングして実行しています。
その場合、ざっくり以下の流れになります。
「celery beat」が設定に基づいてタスクをスケジュールする
時間になるとタスクをメッセージブローカーに送る(キューイング)
Celeryワーカーがメッセージブローカーをポーリングしてタスクを処理する
以下のように定義できます。
日次バッチの設定例です。
# celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
_redis = "redis://{host}:{port}/{db}".format(**REDIS_CONFIGS['your_config'])
app = Celery(
'myproject',
broker=_redis,
backend='rpc://', include=['myproject.tasks']
)
app.conf.beat_schedule = {
'daily_batch': {
'task': 'tasks.exec_daily_batch',
'schedule': crontab(minute="30", hour='0', day_of_week='mon-fri'),
},
}
Celeryは別コンテナに切り出せる
ここまで見ていただければ分かるように、Celeryとアプリケーション本体の間に中継時点(メッセージブローカー)があることで、それぞれ別々の環境(コンテナ)で動いていても、システム全体としてスムーズに動作します。
ここまでお読みいただきありがとうございました!