見出し画像

【IoT】SQS + Lambda という⾮同期処理を実装してみた

IoTシステム構築において、クラウドサービス側から見て外部のサービスを利用する場合は、可用性を高めるため、1つのLambda関数で実装するのではなく、Lambda→SQS→Lambdaという構成が良い、という情報を得ました。

SQS + Lambdaについて調べていると、同期処理(密結合)、非同期処理(疎結合)という概念が重要であることがわかってきました。本記事では、 これらの点も踏まえ、気づきをまとめます。


■ 同期処理(密結合)と非同期処理(疎結合)

まず、同期処理と非同期処理について考えます。

なお、密結合とは、システムの各部分が強く連携しており、変更や障害が他の部分に影響を及ぼしやすい状態です。一方、疎結合とは、各部分が独立しており、変更や障害が他の部分に影響を及ぼしにくい状態です。

◎ 同期処理は、工程1と工程2が直結してる状態

製造ラインで例えると、同期処理(密結合)は、工程1と工程2がコンベアで直結している状態です。工程1の出力が、そのまま工程2の入力となります。1つのLambda関数で工程1と工程2を実行するイメージです。

◎ 非同期処理は、工程間に仕掛品置き場がある状態

一方、非同期処理(疎結合)は、工程1と工程2の間に仕掛品置き場がある状態です。工程1の出力は一旦仕掛品置き場に溜められ、仕掛品置き場に置かれたモノが工程2の入力となります。この仕掛品置き場が、Amazon SQSです。

■ 非同期処理の4つのメリット

参考にしたスライドでは、以下の4つのメリットが挙げられています。

引用:https://speakerdeck.com/pikosan0000/sqs-lambda-devday2022?slide=10

4つのメリットの中でも、IoTシステム構築における1番のメリットは、耐障害性・可用性の向上だと感じました(後述)。

引用:https://speakerdeck.com/pikosan0000/sqs-lambda-devday2022?slide=13

■ 同期処理と非同期処理の切り分け指針

では、どのように処理を切り分ければいいのでしょうか。一つの切り分け指針は、AWS内部サービスと、外部サービスとを切り分けることです。

例えば、IoTデータを受け取り、DynamoDBに保存し、通知判断をし、通知が必要な場合はLINE Notifyで通知する、というIoTシステムを考えます。

この一連の流れのうち、LINE NotifyはAWSから見て外部サービスです。この外部サービスをSQSの後に配置することで、IoTシステム全体の可用性を高めることができます。

通知判断(=工程1)のLambda関数は、IoTデータを入力とし、DynamoDBへの保存と通知判断を行い、通知が必要な場合は通知関連情報を出力します。

この出力は、SQS(=仕掛品置き場=バッファ)に、いったん置かれます。

SQSにデータが溜まったことをトリガーに、LINE通知(=工程2)のLambda関数は通知関連データをSQSから取得し、LINE通知を実行します。LINE通知実行後、SQSからデータを削除します。

外部サービス(LINE Notify)が一時的にダウンした場合でも、SQSにデータが保存され、サービス復旧後に自動的に処理されます。

LINE通知以外にも、例えば、IoTデータをGoogleスプレッドシートに保存したい、という場合も、SQS + Googleスプレッドシート保存用Lambdaという構成で、可用性を高めた実装が可能です。

■ Lambda1 + SQS +Lambda2の実装例

コードの概要を述べます。

◎ SQSにデータを入力するLambda例

SQS作成するとURLを取得できるので、このURLをコード内で指定しています。また、このLambdaのIAMロールにSQSへのアクセス権限を与える必要があります。

import boto3
import json

def lambda_handler(event, context):
    # SQS クライアントを作成
    sqs = boto3.client('sqs')
    queue_url = 'https://sqs.YOUR_REGION.amazonaws.com/YOUR_ACCOUNT_ID/YOUR_QUEUE_NAME'  # 実際のキュー URL に置き換えてください

    # 送信するメッセージを定義
    message = {
        'message': 'こんにちは、Lambda2!'
    }

    # メッセージを SQS に送信
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message)
    )

    print(f"SQS にメッセージを送信しました。MessageId: {response['MessageId']}")
    return {
        'statusCode': 200,
        'body': json.dumps('メッセージを送信しました')
    }

◎ SQSからのデータを処理するLambda例

このLambdaのトリガーとして、作成したSQSを設定します。

import json

def lambda_handler(event, context):
    for record in event['Records']:
        # SQS メッセージの本文を取得
        body = json.loads(record['body'])
        message = body.get('message', 'メッセージがありません')

        # メッセージを処理(ここでは単に出力)
        print(f"受信したメッセージ: {message}")

    return {
        'statusCode': 200,
        'body': json.dumps('メッセージの処理が完了しました')
    }

■ まとめと今後の課題

SQS+Lambdaが良いという話を聞き、かつわかりやすい資料を発見できたおかげで、数時間で実装することができました。また、実際に手を動かすことで、SQS、非同期処理、疎結合の概要を理解することができました。

SQS + Lambdaに加え、Step Functionsにも興味があり、この有用性も調べてみたいと思っています。

本記事が、SQS + Lambdaを実装される方の、参考になれば幸いです。

■ 参考文献

・この登壇がめちゃくちゃわかりやすいです!

・上記登壇のスライドが公開されています!


この記事が気に入ったらサポートをしてみませんか?