
【イベントドリブンアーキテクチャ】Kafkaにより三つのマイクロサービスの実装 - Springboot3 第15回
はじめに
こんにちは、今日は前回に続きKafkaを使ったマイクロサービス構築実習を続けます。 今回はイベントドリブンアーキテクチャについて説明します。 イベントドリブンアーキテクチャは特にマイクロサービスアーキテクチャとの結合がよく行われます。このアーキテクチャはシステムの各機能やサービスが独立して動作し、非同期的なイベントメッセージを通じて互いに通信することで柔軟性と拡張性を高めます。

イベントドリブンアーキテクチャは、システム間のイベントベースの通信を重視し、主にマイクロサービスアーキテクチャと組み合わせて使用されます。互いに全く同じように見えますが、相互に概念的に区別する必要があります。
イベントドリブンアーキテクチャは全体的なシステムアーキテクチャのパラダイムを表す概念であり、マイクロサービスアーキテクチャはこれを実際に適用する具体的な方法の一つです。Kafkaは、このようなアーキテクチャでイベントを安全に送信し、処理するために使用されるツールです。そして、Kafkaではなくても、RabbitMQ、activeMQなども使えます。
実装過程

ブローカーに相当するのがカフカ、
コンシューマーに該当するのが在庫サービスとメールサービス。 それぞれ独立したサービスを維持し、
これがマイクロサービスであると言える。
その中で注文イベントを生成すると、
カフカブローカーを通じて
各サービスが注文イベントを消費する。
1.四つのマイクロサービスの生成






これら3つのSpring Bootプロジェクトを、ポートの異なる3つのtomcatサーバーで実行する必要があります。

プロジェクトを実行する必要はないです。
email-serviceにserver.port=8082を入力します。

stock-serviceにはserver.port=8081を入力します。
2.Base-Domains: DTO(Order, OrderEvent)の生成

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private String orderId;
private String name;
private int qty;
private double price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
private String message;
private String status;
private Order order;
}
Order Eventを別途作成する理由はメッセージベースの通信のためです、 注文イベントDTOを別途定義することで、注文サービスは注文イベントを生成し、そのイベントをメッセージキューまたはイベントストリームに送信することができます。他のサービスは、注文イベントを購読して必要なアクションを実行することができます。これは緩やかな結合を提供し、各サービスが独立して拡張できるようにします。
3.Order-Service: Configure Producer・Topic・Producer・APIの生成
まず、ZookeperとKafkaサーバーを起動します。

.\bin\windows\kafka-server-start.bat .\config\server.properties Kafkaブロッカーの実行
Kafkaサーバーはzookeeperに依存しているため、Kafkaサーバーを実行する場合は必ずzookeeperサービスを最初に実行してください。
Configure Producer

カフカサーバーと通信するポートを設定し、
キーと値のシリアライザーを設定し、
最後にトピック名を照合します。
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.topic.name=order_topics
Configure Topic

外部化されたtopNameが印象的です。
リファクタリングしかくでも、OKです。
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}")
private String topicName;
// spring bean for kafka topic
@Bean
public NewTopic topic(){
return TopicBuilder.name(topicName)
.build();
}
}
Producer

で、base-domainsの依存性をorder-serviceに追加します。

KafkaTemplateとsendMessageにOrder Eventが利用されます。
Order Eventはbase-serviceのDTOです。
@Service
public class OrderProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderProducer.class);
private NewTopic topic;
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public OrderProducer(NewTopic topic, KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.topic = topic;
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(OrderEvent event){
LOGGER.info(String.format("Order event => %s", event.toString()));
// create Message
Message<OrderEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, topic.name())
.build();
kafkaTemplate.send(message);
}
}
「.withPayload(event)」はSpring IntegrationやSpring Messagingで使うメッセージ生成メソッドの一つです。 特にSpring Integrationでメッセージのペイロードを設定する時使います。
ここでeventはメッセージのペイロードで設定されるオブジェクトです。メッセージペイロードはメッセージの主な内容で、実際のデータがここに入ります。「.withPayload(event)」を呼び出すと、eventオブジェクトがメッセージのペイロードに設定され、このメッセージは後からメッセージキューを通じて送信されたり、他のコンポーネントに伝達されます。
では、一体Payloadとはなんでしょうか?「ペイロード(payload)」とは、データ転送で実際に転送されるデータを指します。例えば、HTTP通信でHTTPリクエストのヘッダー(header)にはリクエストのメタデータが入り、HTTPリクエストのボディ(body)には実際に送信されるデータ、つまりペイロードが入ります。
API

@RestController
@RequestMapping("/api/v1")
public class OrderController {
private OrderProducer orderProducer;
public OrderController(OrderProducer orderProducer) {
this.orderProducer = orderProducer;
}
@PostMapping("/orders")
public String placeOrder(@RequestBody Order order){
order.setOrderId(UUID.randomUUID().toString());
OrderEvent orderEvent = new OrderEvent();
orderEvent.setStatus("PENDING");
orderEvent.setMessage("order status is in pending state");
orderEvent.setOrder(order);
orderProducer.sendMessage(orderEvent);
return "Order placed successfully ...";
}
}
注文が生成されると、OrderProducerを使用してKafkaに注文イベントを送信します。
order.setOrderId(UUID.randomUUID().toString())は注文に固有の注文IDを割り当てるコードです。
OrderEventオブジェクトを生成し、これを特定の状態("PENDING"とメッセージと共に初期化します。
orderProducer.sendMessage(orderEvent)を呼び出してKafkaに注文イベントを送信します。



Order Eventが表示されます。

正常に動作していることが確認できます。

4.Stock-Service: Configure, Consumer
今回はコンシューマーとして在庫サービスを作成してみましょう。

server.port=8081
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: stock
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
spring.kafka.consumer.group-id: stock: Kafkaコンシューマーグループの識別子を"stock"に設定します。
spring.kafka.consumer.auto-offset-reset: earliest: オフセット初期化を最も初期に設定することで、コンシューマーグループが最初からメッセージを読めるようにします。
spring.kafka.consumer.properties.spring.json.trusted.packages=*: JSONデシリアライザーの信頼できるパッケージを設定します。全てのパッケージを許可します。
spring.kafka.topic.name=order_topics: 使用するKafkaトピックの名前を「order_topics」に設定します。


@Service
public class OrderConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);
@KafkaListener(
topics = "${spring.kafka.topic.name}"
,groupId = "${spring.kafka.consumer.group-id}"
)
public void consume(OrderEvent event){
LOGGER.info(String.format("Order event received in stock service => %s", event.toString()));
// save the order event into the database
}
}





5.Email-Service: Configure, Consumer
次はメールサービスの設定をしてみましょう。実質的に在庫サービスと同じ作業なので難しくありません。

他のものはポート番号とグループIDくらいですね。
server.port=8082
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: email
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics

依存関係を追加します。
何を依存しているかというと、
図のようにemail-serviceのOrderConsumerが
OrderEventを使っているので、
OrderEventをDTOとして持つbase-domainsに依存しています。
@Service
public class OrderConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);
@KafkaListener(
topics = "${spring.kafka.topic.name}"
,groupId = "${spring.kafka.consumer.group-id}"
)
public void consume(OrderEvent event){
LOGGER.info(String.format("Order event received in email service => %s", event.toString()));
// send an email to the customer
}
}
6. テスト

ポストマンでテストしてみましょう。
JSONボディ(ペイロード)に適当な値を入力して送信します。

コンシューマーであるStockSerivceとEmailServiceに注文イベントがカフカブローカーから無事に届きました。
今日の勉強もコミット・プシュしました!
最後に
この記事では三つの独立したサービス(注文、在庫、メール)を構築し、各サービス間の通信をKafkaを使って行うように説明しました。 また、注文イベントを生成し、Kafkaを使って各サービスがそのイベントを購読するように実装しました。
マイクロサービス間の通信では、注文と関連したイベントを別のDTOで定義し、これをイベント生成と消費に活用して緩い結合を維持しています。
このような素晴らしいアーキテクチャはNetflix, Uber, Airbnbなど様々な大手IT企業が活用しています。それだけ安定性の面で検証されたアーキテクチャということなので、私も大規模なシステム構築やメンテナンスプロジェクトに入るなら、このようなアーキテクチャを知っておいた方がいいと思います。では、次回はKafka以外にもよく使うメッセージブローカーであるRabbitMQについて勉強します!
エンジニアファーストの会社 株式会社CRE-CO
ソンさん
【参考】
[Udemy] Building Microservices with Spring Boot & Spring Cloud