【メッセージ・ブローカー】RabbitMQを通じてメッセージキューの理解と実践 - Springboot3 第16回
はじめに
こんにちは、今日はメッセージブローカーで有名なRabbitMQについて紹介したいと思います。 RabbitMQは分散アプリケーションやサービス間での非同期メッセージングを可能にするために使用されます。RabbitMQは、AMQP(Advanced Message Queuing Protocol)プロトコルを基に設計されています。
AMQPは、メッセージ指向ミドルウェア通信のためのオープンな標準プロトコルです。分散システム間の効率的で安定した通信のために設計されました。 Producer, Queue, Consumer, Exchangeが重要概念です。
RabbitMQの重要概念
Producer: メッセージを送るアプリケーション。コンシューマーに直接送るのではなく、RabbitMQブローカーに送ります。
Consumer: メッセージを読むアプリケーション
Queue: メッセージを格納するバッファまたはストレージ。メッセージが読み込まれるとキューからメッセージが削除されるので、メッセージは正確に一度だけ処理されます。
Message: ProducerからRabbitMQを介してConsumerに送られる情報
Exchange: Producerとキューの仲介者。キューに直接メッセージを送る代わりに、Producerがexchangeにメッセージを送ると、exchangeはキューに一つ以上のキューにメッセージを送ります。このとき、特定のルールに従います。そのため、Producerはメッセージを受け取るキューが何であるかを知る必要はありません。
Routing Key: exchangeがメッセージをキューにどのようにルーティングするかを決定するためのキー。メッセージのアドレスと考えることができます。
Binding: exchange と queue の間のリンク。ルーティングキーを利用して決定されます。
Kafka VS RabbitMQ
1.方式の違い
Kafkaはpub/sub方式、RabbitMQはメッセージブローカー方式です。Kafkaのpub/sub方式は生産者中心の設計で構成されています。 生成者が希望する各メッセージを公開できるようにするメッセージ配布パターンで進行されます。
一方、RabbitMQのメッセージブローカー方式はブローカー中心の設計で構成されます。指定された受信者にメッセージを確認、ルーティング、保存及び配信する役割を遂行し、メッセージ伝達の保証に焦点を当てます。
2.転送されたメッセージに対する揮発性
RabbitMQはqueueに保存されていたメッセージに対してEvent Consumerが持ち去られるとqueueからそのメッセージを削除します。
しかし、Kafkaはコンストラクタからメッセージが来ると、そのメッセージをtopicに分類し、これをevent streamerに保存します。 その後、受信者が特定のtopicに対するメッセージを取っても、event streamerはそのtopicを継続的に維持するため、特定の状況が発生しても再生が可能です。
3.用途の違い
Kafkaはクラスターを通じた並列処理が主な差別点であるため、膨大な量のデータを処理する時、利点が浮き彫りになります。
RabbitMQはデータ処理よりManage UIを提供するため、管理的な側面や、様々な機能実装のためのサービスを構築する時、メリットが浮き彫りになります。
実装過程
1. DockerによりRabbitMQのインストール
RabbitMQ管理UIを使用したRabbitMQの探索
RabbitMQ管理UIは非常に便利なRabbitMQ管理プラグインです。
2. Spring Boot プロジェクトの準備
3. Spring BootとRabbitMQの連結
4. RabbitMQの設定
@Configuration
public class RabbitMQConfig {
@Value("${rabbitmq.queue.name}")
private String queue;
@Value("${rabbitmq.queue.json.name}")
private String jsonQueue;
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
@Value("${rabbitmq.routing.json.key}")
private String routingJsonKey;
// spring bean for rabbitmq queue
@Bean
public Queue queue(){
return new Queue(queue);
}
// spring bean for queue (store json messages)
@Bean
public Queue jsonQueue(){
return new Queue(jsonQueue);
}
// spring bean for rabbitmq exchange
@Bean
public TopicExchange exchange(){
return new TopicExchange(exchange);
}
// binding between queue and exchange using routing key
@Bean
public Binding binding(){
return BindingBuilder
.bind(queue())
.to(exchange())
.with(routingKey);
}
// binding between json queue and exchange using routing key
@Bean
public Binding jsonBinding(){
return BindingBuilder
.bind(jsonQueue())
.to(exchange())
.with(routingJsonKey);
}
@Bean
public MessageConverter converter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate amqpTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
return rabbitTemplate;
}
}
プロパティの構成:
クラスは @Value アノテーションを使用してアプリケーションプロパティから値をインジェクションします(rabbitmq.queue.name、rabbitmq.queue.json.name、rabbitmq.exchange.name、rabbitmq.routing.key、rabbitmq.routing.json.key)。
キューの構成:
queue() メソッド:queue プロパティを使用して非JSONキューのBeanを定義します。
jsonQueue() メソッド:jsonQueue プロパティを使用してJSONキューのBeanを定義します。
エクスチェンジの構成:
exchange() メソッド:exchange プロパティを使用してトピックエクスチェンジのBeanを定義します。
バインディングの構成:
binding() メソッド:routingKey を使用して非JSONキューとエクスチェンジの間のバインディングを定義します。
jsonBinding() メソッド:routingJsonKey を使用してJSONキューとエクスチェンジの間のバインディングを定義します。
メッセージコンバータの構成:
converter() メソッド:JavaオブジェクトをJSON形式に変換するために使用される Jackson2JsonMessageConverter のBeanを定義します。
AMQPテンプレートの構成:
amqpTemplate() メソッド: ConnectionFactory を使用して構成された RabbitTemplate のBeanを定義します。このテンプレートはメッセージ変換に Jackson2JsonMessageConverter を使用します。
@Data
public class User {
private int id;
private String firstName;
private String lastName;
}
5. Producerの作成
RabbitMQキューにメッセージを生成するための2つのクラス、RabbitMQProducerとRabbitMQJsonProducerです
@Service
public class RabbitMQProducer {
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.key}")
private String routingKey;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducer.class);
private RabbitTemplate rabbitTemplate;
public RabbitMQProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message){
LOGGER.info(String.format("전송된 메시지 -> %s", message));
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
sendMessageメソッド
このメソッドはインジェクトされたRabbitTemplateを使用してプレーンテキストメッセージをRabbitMQの交換に送信します。
package net.javaguides.springbootrabbitmqtutorial.publisher;
import net.javaguides.springbootrabbitmqtutorial.dto.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQJsonProducer {
@Value("${rabbitmq.exchange.name}")
private String exchange;
@Value("${rabbitmq.routing.json.key}")
private String routingJsonKey;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonProducer.class);
private RabbitTemplate rabbitTemplate;
public RabbitMQJsonProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendJsonMessage(User user){
LOGGER.info(String.format("Json message sent -> %s", user.toString()));
rabbitTemplate.convertAndSend(exchange, routingJsonKey, user);
}
}
sendJsonMessageメソッド
このメソッドはJSONメッセージ(Userオブジェクト)をRabbitMQの交換に送信します。インジェクトされたRabbitTemplateを使用します。
6. APIの作成
@RestController
@RequestMapping("/api/v1")
public class MessageController {
private RabbitMQProducer producer;
public MessageController(RabbitMQProducer producer) {
this.producer = producer;
}
// http://localhost:8080/api/v1/publish?message=hello
@GetMapping("/publish")
public ResponseEntity<String> sendMessage(@RequestParam("message") String message){
producer.sendMessage(message);
return ResponseEntity.ok("Message sent to RabbitMQ ...");
}
}
/publishエンドポイントでは、GETメソッドがmessageクエリパラメータを受け取り、それをRabbitMQに送信します。
@RestController
@RequestMapping("/api/v1")
public class MessageJsonController {
private RabbitMQJsonProducer jsonProducer;
public MessageJsonController(RabbitMQJsonProducer jsonProducer) {
this.jsonProducer = jsonProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> sendJsonMessage(@RequestBody User user){
jsonProducer.sendJsonMessage(user);
return ResponseEntity.ok("Json message sent to RabbitMQ ...");
}
}
/publishエンドポイントでは、POSTメソッドがUserオブジェクトを受け取り、それをRabbitMQにJSONメッセージとして送信します。
7. Consumerの作成
@Service
public class RabbitMQConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConsumer.class);
@RabbitListener(queues = {"${rabbitmq.queue.name}"})
public void consume(String message){
LOGGER.info(String.format("Received message -> %s", message));
}
}
@RabbitListener アノテーションを使用して、指定されたRabbitMQキューからメッセージを受信するメソッドを定義しています。
メソッドはテキストメッセージを受け取り、ロガーを使用して受信したメッセージをログに記録します。
@Service
public class RabbitMQJsonConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQJsonConsumer.class);
@RabbitListener(queues = {"${rabbitmq.queue.json.name}"})
public void consumeJsonMessage(User user){
LOGGER.info(String.format("Received JSON message -> %s", user.toString()));
}
}
@RabbitListener アノテーションを使用して、指定されたRabbitMQキューからJSONメッセージを受信するメソッドを定義しています。
メソッドは User オブジェクトを受け取り、ロガーを使用して受信したJSONメッセージをログに記録します。
最後に
RabbitMQの利用経験を振り返ってみると、このメッセージブローカーが提供するシンプルで使いやすい機能に感銘を受けました。ただし、大規模なイベント駆動型アーキテクチャやストリーミングデータの処理を求める場合は、Kafkaがより適していると感じました。なぜなら、「転送されたメッセージに対する揮発性」のせいで、メッセジーの伝送保証としては、Kafkaの方が信頼性があるためです。しかしながら、Kafkaよりも初期設定が早くでき、管理や運用が便利であるという点で、明らかな利点があると思う。
エンジニアファーストの会社 株式会社CRE-CO
ソンさん
【参考】
[Udemy] Building Microservices with Spring Boot & Spring Cloud
https://velog.io/@cho876/%EC%B9%B4%ED%94%84%EC%B9%B4kafka-vs-RabbitMQ