【IoT】SQS + Lambda という⾮同期処理を実装してみた
IoTシステム構築において、クラウドサービス側から見て外部のサービスを利用する場合は、可用性を高めるため、1つのLambda関数で実装するのではなく、Lambda→SQS→Lambdaという構成が良い、という情報を得ました。
SQS + Lambdaについて調べていると、同期処理(密結合)、非同期処理(疎結合)という概念が重要であることがわかってきました。本記事では、 これらの点も踏まえ、気づきをまとめます。
■ 同期処理(密結合)と非同期処理(疎結合)
まず、同期処理と非同期処理について考えます。
なお、密結合とは、システムの各部分が強く連携しており、変更や障害が他の部分に影響を及ぼしやすい状態です。一方、疎結合とは、各部分が独立しており、変更や障害が他の部分に影響を及ぼしにくい状態です。
◎ 同期処理は、工程1と工程2が直結してる状態
製造ラインで例えると、同期処理(密結合)は、工程1と工程2がコンベアで直結している状態です。工程1の出力が、そのまま工程2の入力となります。1つのLambda関数で工程1と工程2を実行するイメージです。
◎ 非同期処理は、工程間に仕掛品置き場がある状態
一方、非同期処理(疎結合)は、工程1と工程2の間に仕掛品置き場がある状態です。工程1の出力は一旦仕掛品置き場に溜められ、仕掛品置き場に置かれたモノが工程2の入力となります。この仕掛品置き場が、Amazon SQSです。
■ 非同期処理の4つのメリット
参考にしたスライドでは、以下の4つのメリットが挙げられています。
4つのメリットの中でも、IoTシステム構築における1番のメリットは、耐障害性・可用性の向上だと感じました(後述)。
■ 同期処理と非同期処理の切り分け指針
では、どのように処理を切り分ければいいのでしょうか。一つの切り分け指針は、AWS内部サービスと、外部サービスとを切り分けることです。
例えば、IoTデータを受け取り、DynamoDBに保存し、通知判断をし、通知が必要な場合はLINE Notifyで通知する、というIoTシステムを考えます。
この一連の流れのうち、LINE NotifyはAWSから見て外部サービスです。この外部サービスをSQSの後に配置することで、IoTシステム全体の可用性を高めることができます。
通知判断(=工程1)のLambda関数は、IoTデータを入力とし、DynamoDBへの保存と通知判断を行い、通知が必要な場合は通知関連情報を出力します。
この出力は、SQS(=仕掛品置き場=バッファ)に、いったん置かれます。
SQSにデータが溜まったことをトリガーに、LINE通知(=工程2)のLambda関数は通知関連データをSQSから取得し、LINE通知を実行します。LINE通知実行後、SQSからデータを削除します。
外部サービス(LINE Notify)が一時的にダウンした場合でも、SQSにデータが保存され、サービス復旧後に自動的に処理されます。
LINE通知以外にも、例えば、IoTデータをGoogleスプレッドシートに保存したい、という場合も、SQS + Googleスプレッドシート保存用Lambdaという構成で、可用性を高めた実装が可能です。
■ Lambda1 + SQS +Lambda2の実装例
コードの概要を述べます。
◎ SQSにデータを入力するLambda例
SQS作成するとURLを取得できるので、このURLをコード内で指定しています。また、このLambdaのIAMロールにSQSへのアクセス権限を与える必要があります。
import boto3
import json
def lambda_handler(event, context):
# SQS クライアントを作成
sqs = boto3.client('sqs')
queue_url = 'https://sqs.YOUR_REGION.amazonaws.com/YOUR_ACCOUNT_ID/YOUR_QUEUE_NAME' # 実際のキュー URL に置き換えてください
# 送信するメッセージを定義
message = {
'message': 'こんにちは、Lambda2!'
}
# メッセージを SQS に送信
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message)
)
print(f"SQS にメッセージを送信しました。MessageId: {response['MessageId']}")
return {
'statusCode': 200,
'body': json.dumps('メッセージを送信しました')
}
◎ SQSからのデータを処理するLambda例
このLambdaのトリガーとして、作成したSQSを設定します。
import json
def lambda_handler(event, context):
for record in event['Records']:
# SQS メッセージの本文を取得
body = json.loads(record['body'])
message = body.get('message', 'メッセージがありません')
# メッセージを処理(ここでは単に出力)
print(f"受信したメッセージ: {message}")
return {
'statusCode': 200,
'body': json.dumps('メッセージの処理が完了しました')
}
■ まとめと今後の課題
SQS+Lambdaが良いという話を聞き、かつわかりやすい資料を発見できたおかげで、数時間で実装することができました。また、実際に手を動かすことで、SQS、非同期処理、疎結合の概要を理解することができました。
SQS + Lambdaに加え、Step Functionsにも興味があり、この有用性も調べてみたいと思っています。
本記事が、SQS + Lambdaを実装される方の、参考になれば幸いです。
■ 参考文献
・この登壇がめちゃくちゃわかりやすいです!
・上記登壇のスライドが公開されています!