見出し画像

【メッセージ・ブローカー】RabbitMQを通じてメッセージキューの理解と実践 - Springboot3 第16回


はじめに


こんにちは、今日はメッセージブローカーで有名なRabbitMQについて紹介したいと思います。 RabbitMQは分散アプリケーションやサービス間での非同期メッセージングを可能にするために使用されます。RabbitMQは、AMQP(Advanced Message Queuing Protocol)プロトコルを基に設計されています。

AMQPは、メッセージ指向ミドルウェア通信のためのオープンな標準プロトコルです。分散システム間の効率的で安定した通信のために設計されました。 Producer, Queue, Consumer, Exchangeが重要概念です。

RabbitMQの重要概念


RabbitMQの構造

Producer: メッセージを送るアプリケーション。コンシューマーに直接送るのではなく、RabbitMQブローカーに送ります。

Consumer: メッセージを読むアプリケーション

Queue: メッセージを格納するバッファまたはストレージ。メッセージが読み込まれるとキューからメッセージが削除されるので、メッセージは正確に一度だけ処理されます。

Message: ProducerからRabbitMQを介してConsumerに送られる情報

Exchange: Producerとキューの仲介者。キューに直接メッセージを送る代わりに、Producerがexchangeにメッセージを送ると、exchangeはキューに一つ以上のキューにメッセージを送ります。このとき、特定のルールに従います。そのため、Producerはメッセージを受け取るキューが何であるかを知る必要はありません。

Routing Key: exchangeがメッセージをキューにどのようにルーティングするかを決定するためのキー。メッセージのアドレスと考えることができます。

Binding: exchange と queue の間のリンク。ルーティングキーを利用して決定されます。


RabbitMQを利用するアーキテクチャの例


Kafka VS RabbitMQ


1.方式の違い

Kafkaはpub/sub方式、RabbitMQはメッセージブローカー方式です。Kafkaのpub/sub方式は生産者中心の設計で構成されています。 生成者が希望する各メッセージを公開できるようにするメッセージ配布パターンで進行されます。

一方、RabbitMQのメッセージブローカー方式はブローカー中心の設計で構成されます。指定された受信者にメッセージを確認、ルーティング、保存及び配信する役割を遂行し、メッセージ伝達の保証に焦点を当てます。

2.転送されたメッセージに対する揮発性

RabbitMQはqueueに保存されていたメッセージに対してEvent Consumerが持ち去られるとqueueからそのメッセージを削除します。
しかし、Kafkaはコンストラクタからメッセージが来ると、そのメッセージをtopicに分類し、これをevent streamerに保存します。 その後、受信者が特定のtopicに対するメッセージを取っても、event streamerはそのtopicを継続的に維持するため、特定の状況が発生しても再生が可能です。

3.用途の違い

Kafkaはクラスターを通じた並列処理が主な差別点であるため、膨大な量のデータを処理する時、利点が浮き彫りになります。

RabbitMQはデータ処理よりManage UIを提供するため、管理的な側面や、様々な機能実装のためのサービスを構築する時、メリットが浮き彫りになります。


実装過程


1. DockerによりRabbitMQのインストール


 Dockerの起動
https://hub.docker.com/_/rabbitmq
DockerHubにアクセス


docker pull rabbitmq:3.12.6-management
コマンドでドッカーハブからドッカーイメージを受け取り、
ローカルドッカーデスクトップに保存します。
Imageを確認するとRabbitMQが入っています。
docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3.12.6-management
このコマンドで15662はRabbitMQ managementウェブサイト、5672はRabbitMQ Client接続として使用されます。


RabbitMQ管理UIを使用したRabbitMQの探索
RabbitMQ管理UIは非常に便利なRabbitMQ管理プラグインです。

localhost:15672
Username: guest
Password: guest
Exchangesをクリック。
Nameを入力します。
Queues and Streamsをクリックし、
 Nameを入力。
Exchangesをクリックし、
exchange_demoで、
  バインディングするqueueとRoutingKeyを入力します。
 Queues and Streamsで Bindingを確認。
 exchange_demo -> routing_key_demo
-> This queue(queue_demo)
Exchangesからメッセジーを送ってみます。
 Queues and Streams、 
Queue_demo、
Get Messagesをクリックすると、
Payloadにメッセジーが表示されます。


2. Spring Boot プロジェクトの準備

https://start.spring.io/
ここでRabbitMQ、Web、Lombok依存性を追加します。
https://spring.io/projects/spring-amqp
参考としてのSpring AMQPのドキュメント


3. Spring BootとRabbitMQの連結

 Spring Bootで Queue, Exchange, Binding, Producer, Consumerを実装します。
 もともとは、上記のように設定するべきですが、
基本的値なので、不要。


4. RabbitMQの設定

RabbitMQConfigクラスで、
Queue, Exchange, Binding, Routing Keyを作成します。
文字列とJSONの形で使えるように、
Queue、Routing Key, Bindingは 二つづつ作成します。
@Configuration
public class RabbitMQConfig {

    @Value("${rabbitmq.queue.name}")
    private String queue;

    @Value("${rabbitmq.queue.json.name}")
    private String jsonQueue;

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    @Value("${rabbitmq.routing.json.key}")
    private String routingJsonKey;

    // spring bean for rabbitmq queue
    @Bean
    public Queue queue(){
        return new Queue(queue);
    }

    // spring bean for queue (store json messages)
    @Bean
    public Queue jsonQueue(){
        return new Queue(jsonQueue);
    }

    // spring bean for rabbitmq exchange
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange(exchange);
    }

    // binding between queue and exchange using routing key
    @Bean
    public Binding binding(){
        return BindingBuilder
                .bind(queue())
                .to(exchange())
                .with(routingKey);
    }

    // binding between json queue and exchange using routing key
    @Bean
    public Binding jsonBinding(){
        return BindingBuilder
                .bind(jsonQueue())
                .to(exchange())
                .with(routingJsonKey);
    }

    @Bean
    public MessageConverter converter(){
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());
        return rabbitTemplate;
    }

    
}
  1. プロパティの構成:

    • クラスは @Value アノテーションを使用してアプリケーションプロパティから値をインジェクションします(rabbitmq.queue.name、rabbitmq.queue.json.name、rabbitmq.exchange.name、rabbitmq.routing.key、rabbitmq.routing.json.key)。

  2. キューの構成:

    • queue() メソッド:queue プロパティを使用して非JSONキューのBeanを定義します。

    • jsonQueue() メソッド:jsonQueue プロパティを使用してJSONキューのBeanを定義します。

  3. エクスチェンジの構成:

    • exchange() メソッド:exchange プロパティを使用してトピックエクスチェンジのBeanを定義します。

  4. バインディングの構成:

    • binding() メソッド:routingKey を使用して非JSONキューとエクスチェンジの間のバインディングを定義します。

    • jsonBinding() メソッド:routingJsonKey を使用してJSONキューとエクスチェンジの間のバインディングを定義します。

  5. メッセージコンバータの構成:

    • converter() メソッド:JavaオブジェクトをJSON形式に変換するために使用される Jackson2JsonMessageConverter のBeanを定義します。

  6. AMQPテンプレートの構成:

    • amqpTemplate() メソッド: ConnectionFactory を使用して構成された RabbitTemplate のBeanを定義します。このテンプレートはメッセージ変換に Jackson2JsonMessageConverter を使用します。


JSONメッセジーのためのDTOとして、
Userクラスも作成。
@Data
public class User {
    private int id;
    private String firstName;
    private String lastName;
}


5. Producerの作成

RabbitMQキューにメッセージを生成するための2つのクラス、RabbitMQProducerRabbitMQJsonProducerです

@Service
public class RabbitMQProducer {

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class);

    private RabbitTemplate rabbitTemplate;

    public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message){
        LOGGER.info(String.format("전송된 메시지 -> %s", message));
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

sendMessageメソッド

  • このメソッドはインジェクトされたRabbitTemplateを使用してプレーンテキストメッセージをRabbitMQの交換に送信します。

package net.javaguides.springbootrabbitmqtutorial.publisher;

import net.javaguides.springbootrabbitmqtutorial.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQJsonProducer {

    @Value("${rabbitmq.exchange.name}")
    private String exchange;

    @Value("${rabbitmq.routing.json.key}")
    private String routingJsonKey;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);

    private RabbitTemplate rabbitTemplate;

    public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendJsonMessage(User user){
        LOGGER.info(String.format("Json message sent -> %s", user.toString()));
        rabbitTemplate.convertAndSend(exchange, routingJsonKey, user);
    }

}

sendJsonMessageメソッド

  • このメソッドはJSONメッセージ(Userオブジェクト)をRabbitMQの交換に送信します。インジェクトされたRabbitTemplateを使用します。


6. APIの作成

テキストメッセージをRabbitMQに送信するためのMessageController
JSONメッセージをRabbitMQに送信するためのMessageJsonController
@RestController
@RequestMapping("/api/v1")
public class MessageController {

    private RabbitMQProducer producer;

    public MessageController(RabbitMQProducer producer) {
        this.producer = producer;
    }

    // http://localhost:8080/api/v1/publish?message=hello
    @GetMapping("/publish")
    public ResponseEntity<String> sendMessage(@RequestParam("message") String message){
        producer.sendMessage(message);
        return ResponseEntity.ok("Message sent to RabbitMQ ...");
    }
}

/publishエンドポイントでは、GETメソッドがmessageクエリパラメータを受け取り、それをRabbitMQに送信します。

@RestController
@RequestMapping("/api/v1")
public class MessageJsonController {

    private RabbitMQJsonProducer jsonProducer;

    public MessageJsonController(RabbitMQJsonProducer jsonProducer) {
        this.jsonProducer = jsonProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> sendJsonMessage(@RequestBody User user){
        jsonProducer.sendJsonMessage(user);
        return ResponseEntity.ok("Json message sent to RabbitMQ ...");
    }
}

/publishエンドポイントでは、POSTメソッドがUserオブジェクトを受け取り、それをRabbitMQにJSONメッセージとして送信します。

Postmanで、Producer API テスト。
 JSONメッセジーが送られた。
クエリパラメータもテストするべきが、ここでは省略します。
ブラウザのURLでテストできます。
RabbitMQ UIで送られたJSONメッセジーを確認できます。
User DTOのメッセージを得ました。
Exchangesを確認すると、
JSON用のルーティングキーをもって送られた。


7. Consumerの作成

テキストメッセージを受信するための RabbitMQConsumer
JSONメッセージを受信するための RabbitMQJsonConsumer
@Service
public class RabbitMQConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.name}"})
    public void consume(String message){
        LOGGER.info(String.format("Received message -> %s", message));
    }
}

@RabbitListener アノテーションを使用して、指定されたRabbitMQキューからメッセージを受信するメソッドを定義しています。

メソッドはテキストメッセージを受け取り、ロガーを使用して受信したメッセージをログに記録します。

@Service
public class RabbitMQJsonConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);

    @RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
    public void consumeJsonMessage(User user){
        LOGGER.info(String.format("Received JSON message -> %s", user.toString()));
    }
}

@RabbitListener アノテーションを使用して、指定されたRabbitMQキューからJSONメッセージを受信するメソッドを定義しています。

メソッドは User オブジェクトを受け取り、ロガーを使用して受信したJSONメッセージをログに記録します。

Postmanでリクエストを送ります。
Producerも、Consumerも、うまくできた~!


今日の勉強もコミット・プシュしました。


最後に


RabbitMQの利用経験を振り返ってみると、このメッセージブローカーが提供するシンプルで使いやすい機能に感銘を受けました。ただし、大規模なイベント駆動型アーキテクチャやストリーミングデータの処理を求める場合は、Kafkaがより適していると感じました。なぜなら、「転送されたメッセージに対する揮発性」のせいで、メッセジーの伝送保証としては、Kafkaの方が信頼性があるためです。しかしながら、Kafkaよりも初期設定が早くでき、管理や運用が便利であるという点で、明らかな利点があると思う。

エンジニアファーストの会社 株式会社CRE-CO
ソンさん



【参考】


いいなと思ったら応援しよう!