![見出し画像](https://assets.st-note.com/production/uploads/images/57760311/rectangle_large_type_2_37c24a3d28b132ccb5967ab429f8c082.png?width=1200)
[AWS] SQS #2 - EC2, SQS, Lambdaを組み合わせてメッセージの連動を行う
■ 概要
SQSを用いたシステム間のメッセージ連動を想定して、EC2(Amazon Linux 2)上のアプリケーション(SDK for Java)からSQSを通して、Lambdaの処理を実行する、という構成を実装してみた。
■ 目標構成
前回の記事「[AWS] SQS #1 - テスト環境を作ってみた (SDK for Java)」 の構成を引き続き利用する。今回は連動用のキューを用意して、そのキューへのデータPUTをトリガーに、Lambdaの関数が実行される仕組みになっている。
■ 目次
・キューの作成 (SQS)
・メッセージ送信用プログラムの作成 (Java)
・Lambda関数の実装 (Python)
・メッセージ連動の確認 (Cloudwatch)
■ キューの作成 (SQS)
「MyTestQueue-001.fifo」を作成
■ メッセージ送信用プログラムの作成 (Java)
SDK for Javaでサンプルとして公開されているプログラムでは、一件のみの送信方法しかなかったため、複数メッセージを送信できるように改良してみました。
SendMultipleMessagesSQS.java
import java.util.List;
import java.util.Map.Entry;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import java.time.LocalDateTime;
public class SendMultipleMessagesSQS {
public static void main(String[] args) throws Exception {
// Create Client
final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
// Define Queue Url
final String queueUrl = "https://sqs.ap-northeast-1.amazonaws.com/157634048600/MyTestQueue-001.fifo";
// Define the number of messages you send to SQS
int sendMsgNum = 10;
// Display processing start time
System.out.println("===========================================");
System.out.println("Start Sending at : " + LocalDateTime.now().toString());
System.out.println("===========================================");
// Send Messages
for (int i=0; i<sendMsgNum; i++){
// Difine the sequence number for DedupulicationId which is necessary for FIFO Queue
String seqNum = String.valueOf(i);
// Create Messages to send
SendMessageRequest sendMsgRequest = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("Test Message " + seqNum)
.withMessageDeduplicationId("dedepId_" + seqNum)
.withMessageGroupId("Test001");
// Send Message
sqs.sendMessage(sendMsgRequest);
}
// Display processing start time
System.out.println("===========================================");
System.out.println("Finish Sending at : " + LocalDateTime.now().toString());
System.out.println("===========================================");
}
}
実行用のディレクトリ(SendMultipleMessagesSQS)を作成。ディレクトリ/ファイル権限はサンプルと合わせておきました。
[ec2-user@ip-172-31-36-63 samples]$ mkdir SendMultipleMessagesSQS
[ec2-user@ip-172-31-36-63 samples]$ chmod 755 SendMultipleMessagesSQS
[ec2-user@ip-172-31-36-63 samples]$ ls -ld SendMultipleMessagesSQS
drwxr-xr-x 2 ec2-user ec2-user 6 Jul 29 01:37 SendMultipleMessagesSQS
必要なファイルのアップロード
・build.xml
・SendMultipleMessagesSQS.java
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ll
total 8
-rw-r--r-- 1 ec2-user ec2-user 688 Jul 29 00:55 build.xml
-rw-r--r-- 1 ec2-user ec2-user 2387 Jul 29 01:31 SendMultipleMessagesSQS.java
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ chmod 664 *
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ll
total 8
-rw-rw-r-- 1 ec2-user ec2-user 688 Jul 29 00:55 build.xml
-rw-rw-r-- 1 ec2-user ec2-user 2387 Jul 29 01:31 SendMultipleMessagesSQS.java
いざ、実行
[ec2-user@ip-172-31-36-63 SendMultipleMessagesSQS]$ ant
Buildfile: /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS/build.xml
run:
[javac] /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
[javac] Compiling 1 source file to /home/ec2-user/aws_sdk_java/aws-java-sdk-1.12.30/samples/SendMultipleMessagesSQS
[javac] warning: Supported source version 'RELEASE_7' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.8'
[javac] 1 warning
[java] ===========================================
[java] Start Sending at : 2021-07-29T01:54:51.438
[java] ===========================================
[java] ===========================================
[java] Finish Sending at : 2021-07-29T01:54:52.499
[java] ===========================================
BUILD SUCCESSFUL
Total time: 5 seconds
何とかうまく動きました。送信したメッセージがキューに滞留しているかを確認します。
きちんと10件対象のキューに滞留していました👏
■ Lambda関数の実装 (Python)
関数を作成
中身は Python で実装。以下のように取得したメッセージをログに出すため、eventを出力するようにした。
import json
def lambda_handler(event, context):
print(event)
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
作成したキューとの紐づけを行う。Lambdaのコンソール画面からトリガーを追加を選択して、対象のキューを選んでいく。以下のように「有効化」にチェックを入れた状態で追加を押すと、すでに滞留しているメッセージを取得してくるはず!
トリガーが追加されているのがコンソール画面からわかる。
これにて実装はすべて完了♪
■ メッセージ連動の確認 (Cloudwatch)
最後に、メッセージが連動されているかを確認する。Lambdaコンソール画面から、「モニタリング」を選択すると、「CloudWatchのログを表示」とあるので、そこを選択。
するといかのようにログが出力されているのがわかる。
実際にログの中身を見てみる。
Lambdaの処理開始、終了のログも出てるし、{ 'Records' }の中に受信したメッセージの詳細も出力されている。
先ほど滞留していたキューのメッセージも以下のように 0件になり、正常にメッセージが連動できていることがわかる。
SQSには連動できるメッセージサイズの上限が256KB、という制約があるため、次回はその課題を解決するための方法を検証したいと思います。