見出し画像

【イベントドリブンアーキテクチャ】Kafkaにより三つのマイクロサービスの実装 - Springboot3 第15回


はじめに


こんにちは、今日は前回に続きKafkaを使ったマイクロサービス構築実習を続けます。 今回はイベントドリブンアーキテクチャについて説明します。 イベントドリブンアーキテクチャは特にマイクロサービスアーキテクチャとの結合がよく行われます。このアーキテクチャはシステムの各機能やサービスが独立して動作し、非同期的なイベントメッセージを通じて互いに通信することで柔軟性と拡張性を高めます。

イベントドリブンアーキテクチャはすべてのアプリケーションは疎結合であり、互いに独立してます。アプリケーションは非同期通信のためにメッセージ・ブローカーを使用します。

イベントドリブンアーキテクチャは、システム間のイベントベースの通信を重視し、主にマイクロサービスアーキテクチャと組み合わせて使用されます。互いに全く同じように見えますが、相互に概念的に区別する必要があります。

イベントドリブンアーキテクチャは全体的なシステムアーキテクチャのパラダイムを表す概念であり、マイクロサービスアーキテクチャはこれを実際に適用する具体的な方法の一つです。Kafkaは、このようなアーキテクチャでイベントを安全に送信し、処理するために使用されるツールです。そして、Kafkaではなくても、RabbitMQ、activeMQなども使えます。


実装過程


プロデューサーに該当するのがオーダーサービス、
ブローカーに相当するのがカフカ、
コンシューマーに該当するのが在庫サービスとメールサービス。 それぞれ独立したサービスを維持し、
これがマイクロサービスであると言える。

その中で注文イベントを生成すると、
カフカブローカーを通じて
各サービスが注文イベントを消費する。

1.四つのマイクロサービスの生成

 order-serviceの生成
stock-serviceの生成
 order-serviceの生成
base-domainsの生成
2つの独立したモジュールを呼び出します。

これら3つのSpring Bootプロジェクトを、ポートの異なる3つのtomcatサーバーで実行する必要があります。

base-domainsはスタンドアローン・プロジェクトなので、
プロジェクトを実行する必要はないです。
email-serviceにserver.port=8082を入力します。
order-serviceはserver.port=8080で、そのまま。
stock-serviceにはserver.port=8081を入力します。


2.Base-Domains: DTO(Order, OrderEvent)の生成

base-domainsにOrderとOrderEventを作成します。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private String orderId;
    private String name;
    private int qty;
    private double price;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
    private String message;
    private String status;
    private Order order;
}

Order Eventを別途作成する理由はメッセージベースの通信のためです、 注文イベントDTOを別途定義することで、注文サービスは注文イベントを生成し、そのイベントをメッセージキューまたはイベントストリームに送信することができます。他のサービスは、注文イベントを購読して必要なアクションを実行することができます。これは緩やかな結合を提供し、各サービスが独立して拡張できるようにします。


3.Order-Service: Configure Producer・Topic・Producer・APIの生成

まず、ZookeperとKafkaサーバーを起動します。

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties Zookeeperの実行

.\bin\windows\kafka-server-start.bat .\config\server.properties Kafkaブロッカーの実行

Kafkaサーバーはzookeeperに依存しているため、Kafkaサーバーを実行する場合は必ずzookeeperサービスを最初に実行してください。


Configure Producer

プロデューサーであるorder-serviceの設定を行います。
カフカサーバーと通信するポートを設定し、
キーと値のシリアライザーを設定し、
最後にトピック名を照合します。
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.topic.name=order_topics


Configure Topic

KafkaTopicConfigクラスの作成。
外部化されたtopNameが印象的です。
リファクタリングしかくでも、OKです。
@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.topic.name}")
    private String topicName;

    // spring bean for kafka topic
    @Bean
    public NewTopic topic(){
        return TopicBuilder.name(topicName)
                .build();
    }
}


Producer

  order-serviceは基本的にbase-domainsに依存しています。
で、base-domainsの依存性をorder-serviceに追加します。
OrderProducerクラスを見ると、
KafkaTemplateとsendMessageにOrder Eventが利用されます。
Order Eventはbase-serviceのDTOです。
@Service
public class OrderProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderProducer.class);

    private NewTopic topic;

    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public OrderProducer(NewTopic topic, KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(OrderEvent event){
        LOGGER.info(String.format("Order event => %s", event.toString()));

        // create Message
        Message<OrderEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.TOPIC, topic.name())
                .build();
        kafkaTemplate.send(message);
    }
}

「.withPayload(event)」はSpring IntegrationやSpring Messagingで使うメッセージ生成メソッドの一つです。 特にSpring Integrationでメッセージのペイロードを設定する時使います。

ここでeventはメッセージのペイロードで設定されるオブジェクトです。メッセージペイロードはメッセージの主な内容で、実際のデータがここに入ります。「.withPayload(event)」を呼び出すと、eventオブジェクトがメッセージのペイロードに設定され、このメッセージは後からメッセージキューを通じて送信されたり、他のコンポーネントに伝達されます。

では、一体Payloadとはなんでしょうか?「ペイロード(payload)」とは、データ転送で実際に転送されるデータを指します。例えば、HTTP通信でHTTPリクエストのヘッダー(header)にはリクエストのメタデータが入り、HTTPリクエストのボディ(body)には実際に送信されるデータ、つまりペイロードが入ります。


API

@RestController
@RequestMapping("/api/v1")
public class OrderController {

    private OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    @PostMapping("/orders")
    public String placeOrder(@RequestBody Order order){

        order.setOrderId(UUID.randomUUID().toString());

        OrderEvent orderEvent = new OrderEvent();
        orderEvent.setStatus("PENDING");
        orderEvent.setMessage("order status is in pending state");
        orderEvent.setOrder(order);

        orderProducer.sendMessage(orderEvent);

        return "Order placed successfully ...";
    }
}

注文が生成されると、OrderProducerを使用してKafkaに注文イベントを送信します。 

order.setOrderId(UUID.randomUUID().toString())は注文に固有の注文IDを割り当てるコードです。

OrderEventオブジェクトを生成し、これを特定の状態("PENDING"とメッセージと共に初期化します。
orderProducer.sendMessage(orderEvent)を呼び出してKafkaに注文イベントを送信します。

order-serviceの起動。
Postmanでリクエストを行います。
コンソールを見ると、
Order Eventが表示されます。
キーと値のシリアライズが
正常に動作していることが確認できます。


4.Stock-Service: Configure, Consumer

今回はコンシューマーとして在庫サービスを作成してみましょう。

stock-serviceのコンシューマー設定値を入力します。
server.port=8081
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: stock
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics

spring.kafka.consumer.group-id: stock: Kafkaコンシューマーグループの識別子を"stock"に設定します。
spring.kafka.consumer.auto-offset-reset: earliest: オフセット初期化を最も初期に設定することで、コンシューマーグループが最初からメッセージを読めるようにします。
spring.kafka.consumer.properties.spring.json.trusted.packages=*: JSONデシリアライザーの信頼できるパッケージを設定します。全てのパッケージを許可します。
spring.kafka.topic.name=order_topics: 使用するKafkaトピックの名前を「order_topics」に設定します。

先ほどと同様に在庫サービスも注文イベントをbase-domainsに依存しているので、依存関係を追加します。
OrderConsumerクラスコードを下記のように作成します。
@Service
public class OrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);

    @KafkaListener(
            topics = "${spring.kafka.topic.name}"
            ,groupId = "${spring.kafka.consumer.group-id}"
    )
    public void consume(OrderEvent event){
        LOGGER.info(String.format("Order event received in stock service => %s", event.toString()));

        // save the order event into the database
    }
}
stock-serviceの起動。
キーと値のデシリアライズがうまく機能します。
コンシューマーが購読しているトピックの名前も確認できます。
プロデューサーがカフカブローカーに送った注文イベントがコンシューマーに届き、その内容もコンソールで確認できます。


5.Email-Service: Configure, Consumer 

次はメールサービスの設定をしてみましょう。実質的に在庫サービスと同じ作業なので難しくありません。

email-serviceの設定をします。
他のものはポート番号とグループIDくらいですね。
server.port=8082
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: email
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics
email-serviceもbase-domainsに依存しているので、
依存関係を追加します。
何を依存しているかというと、
図のようにemail-serviceのOrderConsumerが
OrderEventを使っているので、
OrderEventをDTOとして持つbase-domainsに依存しています。
@Service
public class OrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);

    @KafkaListener(
            topics = "${spring.kafka.topic.name}"
            ,groupId = "${spring.kafka.consumer.group-id}"
    )
    public void consume(OrderEvent event){
        LOGGER.info(String.format("Order event received in email service => %s", event.toString()));

        // send an email to the customer
    }
}


6. テスト 

さあ、もう終わりです。
ポストマンでテストしてみましょう。
JSONボディ(ペイロード)に適当な値を入力して送信します。
プロデューサーであるOrderServiceに「Powerful Computer Order!!!!」と表示されました。 これはカフカブローカーに送信されたものです。
コンシューマーであるStockSerivceとEmailServiceに注文イベントがカフカブローカーから無事に届きました。


今日の勉強もコミット・プシュしました!


最後に


この記事では三つの独立したサービス(注文、在庫、メール)を構築し、各サービス間の通信をKafkaを使って行うように説明しました。 また、注文イベントを生成し、Kafkaを使って各サービスがそのイベントを購読するように実装しました。

マイクロサービス間の通信では、注文と関連したイベントを別のDTOで定義し、これをイベント生成と消費に活用して緩い結合を維持しています。
このような素晴らしいアーキテクチャはNetflix, Uber, Airbnbなど様々な大手IT企業が活用しています。それだけ安定性の面で検証されたアーキテクチャということなので、私も大規模なシステム構築やメンテナンスプロジェクトに入るなら、このようなアーキテクチャを知っておいた方がいいと思います。では、次回はKafka以外にもよく使うメッセージブローカーであるRabbitMQについて勉強します!


エンジニアファーストの会社 株式会社CRE-CO
ソンさん


【参考】


  • [Udemy] Building Microservices with Spring Boot & Spring Cloud

この記事が気に入ったらサポートをしてみませんか?