見出し画像

【大量データを処理する分散メッセージキュー】Apache Kafkaの基本的実装 -Springboot3 第13回



はじめに


こんにちは、今日は大規模のシステムで使われるApach Kafkaについて勉強します!Kafkaは大量データを処理する分散メッセージキュー・システムです。

Kafkaを簡単に紹介する動画を見ましょう。
(英語)


Apache Kafkaの概要



アパッチ・カフカは、分散型のリアルタイムデータストリーム処理システムで、以下のような構造を持っています

  1. プロデューサー(Producer): データを生成し、それをカフカに送信する役割を果たします。例えば、ログデータやセンサーデータを生成するシステムがプロデューサーとなります。

  2. ブローカー(Broker): カフカのサーバーで、メッセージを保存し管理します。複数のブローカーがクラスタを構成し、データの分散処理をサポートします。

  3. トピック(Topic): データのカテゴリを意味し、メッセージが属するテーマです。例えば、「ログ」、「イベント」などがトピックとなります。

  4. パーティション(Partition): 各トピックは1つ以上のパーティションに分割されます。パーティションはデータの物理的な分離を意味し、並列処理と安定性を提供します。

  5. オフセット(Offset):メッセージストリーム内の各メッセージを識別するための概念です。各メッセージには、オフセットと呼ばれる固有の識別子が割り当てられています。これはメッセージが属するパーティション内での位置を示します。

  6. ズーキーパー(Zookeeper): カフカクラスタの構成やリーダーの選出などを管理する分散コーディネータです。

  7. コンシューマー(Consumer): データを消費し、処理する役割を果たします。複数のコンシューマーが同時に1つのトピックからメッセージを消費できます。


要約すると、プロデューサーがデータを生成して特定のトピックに送信します。
ブローカーがそのデータを管理し、トピックのパーティションに分配します。
その後、コンシューマーは特定のトピックのパーティションからデータを消費して処理します。


Apache Kafkaの実装過程


1.Kafkaのインストールとプロジェクト生成

「QUICK START」をクリック
「Download」をクリック
下線のURLをクリック
圧縮されているファイルを解凍します。
「Zookeper」を起動するために、
Powershellで以下のコマンドを入力します。
  .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
2181が表示されたら、OK
「Kafkaサーバー」を起動するために、
Powershellで以下のコマンドを入力します。
.\bin\windows\kafka-server-start.bat .\config\server.properties
2181が表示されたら、OK
.\bin\windows\kafka-topics.bat --create --topic topic-example --bootstrap-server localhost:9092

上記のコマンドで「topic」を作りましょう。
--create: トピックを作成するオプションです。
--topic topic-example: 作成するトピックの名前を指定します。
--bootstrap-server localhost:9092: Kafkaクラスタへの接続情報を指定しま。

.\bin\windows\kafka-console-producer.bat --topic topic-example --bootstrap-server localhost:9092

このコマンドは、Kafkaのコンソールプロデューサーを起動するためのものです。指定されたトピック(topic-example)およびブートストラップサーバー(localhost:9092)にメッセージを送信します。

いろいろ文字列を入力します。
「Ctrl+Z」で戻ります。
以下のコマンドでコンシューマーの「topic」
.\bin\windows\kafka-console-consumer.bat --topic topic-example --from-beginning --bootstrap-server localhost:9092
このコマンドは、Kafkaのコンソールコンシューマーを起動し、topic-exampleというトピックからメッセージを読み取ります。

いままでのコマンドをおさらいします!

STEP 1: DOWNLOAD AND INSTALL KAFKA
https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz

STEP 2: START THE KAFKA ENVIRONMENT
# Start the ZooKeeper service
C:\Users\***\Downloads\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

# Start the Kafka broker service
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-topics.bat --create --topic topic_demo --bootstrap-server localhost:9092

STEP 4: WRITE SOME EVENTS INTO THE TOPIC
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-console-producer.bat --topic topic_demo --bootstrap-server localhost:9092
>hello world
>topic demo

STEP 5:  READ THE EVENTS
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-console-consumer.bat --topic topic_demo --from-beginning --bootstrap-server localhost:9092
hello world
topic demo


「https://start.spring.io/」にアクセスし、
「Spring Web」と「Spring for Apache Kafka」依存性を追加。
プロジェクトをダウンロードし、
Intellijで読み込む。
アプリ起動!


2.ProducerとConsumerの設定

Kafkaの公式ドキュメントを確認します。

そこで、基本的設定方法があります。


 application.propertiesに以下の設定をします。

spring.kafka.consumer.bootstrap-severs: logcalhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer:org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value -serializer:org.apache.kafka.common.serialization.StringSerializer

コンシューマーの設定

  1. spring.kafka.consumer.bootstrap-servers: logcalhost:9092

    • コンシューマが接続するKafkaブローカーのアドレスを指定します。ここではlocalhostのポート9092にあるブローカーに接続します。

  2. spring.kafka.consumer.group-id: myGroup

    • コンシューマのグループIDを設定します。グループIDは処理を並列化し複数のコンシューマ間でタスクを分散するために使用されます。

  3. spring.kafka.consumer.auto-offset-reset: earliest

    • 初期オフセットがないか、または現在のオフセットがサーバー上に存在しない場合の動作を決定します。ここでは最初のオフセットからデータを読むように設定されています。

  4. spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    • Kafkaメッセージのキーを逆シリアライズするためのクラスを指定します。ここではキーを文字列として逆シリアライズします。

  5. spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    • Kafkaメッセージの値を逆シリアライズするためのクラスを指定します。値を文字列として逆シリアライズします。

プロデューサの設定

  1. spring.kafka.producer.bootstrap-servers: localhost:9092

    • プロデューサが接続するKafkaブローカーのアドレスを指定します。

  2. spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringDeserializer

    • Kafkaメッセージのキーをシリアライズするためのクラスを指定します。ここではキーを文字列としてシリアライズします。

  3. spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringDeserializer

    • Kafkaメッセージの値をシリアライズするためのクラスを指定します。ここでは値を文字列としてシリアライズします。


3.Topic・Producer・SendMessageController生成

「Topic」として、config/KafkaTopicConfig.javaを作成します。
@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}
「Producer」として、KafkaProducerを作成します。
@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent %s", message));
        kafkaTemplate.send("javaguides", message);
    }
}
  1. private static final Logger LOGGER: ロギング用のLoggerオブジェクトを初期化しています。これは、メッセージが送信されたことをログに記録します。

  2. private KafkaTemplate<String, String> kafkaTemplate: Kafkaメッセージを送信するために使用される KafkaTemplate オブジェクトを保持しています。

  3. public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate): コンストラクタです。KafkaTemplate オブジェクトを受け取り、インスタンス変数に設定します。

  4. public void sendMessage(String message): メッセージを送信するためのメソッドです。メソッドは引数としてメッセージを受け取り、それをKafkaのトピックに送信します。

  5. LOGGER.info(String.format("Message sent %s", message));: メッセージが正常に送信されたことをログに記録しています。

  6. kafkaTemplate.send("javaguides", message);: kafkaTemplate を使用して、指定されたトピック("javaguides")にメッセージを送信しています。

MessageControllerを作成します。
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {

    private KafkaProducer kafkaProducer;

    @Autowired
    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    // http:localhost:8080/api/v1/kafka/publish?message=hello world
    @GetMapping("/publish")
    public ResponseEntity<String> publish(@RequestParam("message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok("Message sent to the topic");

    }
}
URLを受け取り、
文字列をリターンします。
https://kafka.apache.org/quickstart
コマンドのtopicでjavaguidesに切り替えます。
bin\windows\kafka-console-consumer.bat --topic javaguides --from-beginning --bootstrap-server localhost:9092

4.Consumerの生成


KafkaConsumerを作成します。
@Service
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "javaguides", groupId = "myGroup")
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));

    }
}

@KafkaListener(topics = "javaguides", groupId = "myGroup"): @KafkaListener アノテーションを使用して、指定されたトピック("javaguides")およびグループID("myGroup")でメッセージを受信するメソッドを指定しています。

URLの文字列が表示されます。

5.Producer・ConsumerのJSON SerializerとDeserializer設定

application.properties

spring.kafka.consumer.bootstrap-severs: logcalhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer:org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer:org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  1. spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

    • コンシューマーの値(メッセージの本文)のデシリアライザを指定しています。

    • ここでは、JsonDeserializerを使用してJSON形式のメッセージをデシリアライズするように設定されています。

  2. spring.kafka.consumer.properties.spring.json.trusted.packages=*

    • JsonDeserializerの設定で、受信したJSONメッセージをデシリアライズする際に信頼されるJavaパッケージを指定しています。

  3. spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

    • プロデューサーの値(メッセージの本文)のシリアライザを指定しています。

    • ここでは、JsonSerializerを使用してJavaオブジェクトをJSON形式に変換するように設定されています。


6.Simple POJOをSerialize/Deserializeへ


payloadパッケージの中にUserクラスを作成します。
public class User {
    private int id;
    private String firstName;
    private String lastName;


    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                '}';
    }
}

toString() メソッド

  • オブジェクトの文字列表現を提供するために toString() メソッドがオーバーライドされています。これにより、オブジェクトを文字列に変換してログなどに表示する際に便利です。

7.JSON Messageを生成するProducer作成

JsonKafkaProducerクラス作成
@Service
public class JsonKafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaProducer.class);

    private KafkaTemplate<String, User> kafkaTemplate;

    public JsonKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(User data) {

        LOGGER.info(String.format("Message sent -> %s", data.toString()));

        Message<User> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, "javaguides")
                .build();

        kafkaTemplate.send(message);

    }


}

public void sendMessage(User data): JSON形式のメッセージを送信するためのメソッドです。

LOGGER.info(String.format("Message sent -> %s", data.toString()));: メッセージが正常に送信されたことをログに記録しています。

Message<User> message = MessageBuilder...: MessageBuilder を使用して、User オブジェクトをペイロードとして持つKafkaメッセージを構築しています。

kafkaTemplate.send(message);: kafkaTemplate を使用して、構築したメッセージをKafkaに送信しています。

 8.JSONオブジェクトを送るAPI作成

JsonMessageControllerを作成します。
@RestController
@RequestMapping("/api/v1/kafka")
public class JsonMessageController {

    private JsonKafkaProducer kafkaProducer;

    public JsonMessageController(JsonKafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody User user) {
        kafkaProducer.sendMessage(user);
        return ResponseEntity.ok("Json Message sent to kafka topic");

    }
}

この JsonMessageController クラスは、JSON形式のメッセージをKafkaに送信するためのコントローラーです。

HTTP POSTリクエストで受け取ったJSON形式のユーザーオブジェクトを JsonKafkaProducer を使用してKafkaに送信します。メッセージの送信が成功した場合、ResponseEntity を使用してHTTPステータス200(OK)とメッセージを返します。

Postmanでテストして、成功した。
文字列がコマンドで表示されている。
topicをjavaguides_jsonに切り替えし、
うまくいきます。
bin\windows\kafka-console-consumer.bat --topic javaguides_json --from-beginning --bootstrap-server localhost:9092


9.JSON Messageを消費するConsumer作成

JsonKafkaConsumerを作成します。
@Service
public class JsonKafkaConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaConsumer.class);

    @KafkaListener(topics = "javaguides_json", groupId = "myGroup")
    public void consume(User user) {
        LOGGER.info(String.format("Json message recieved -> %s", user.toString()));
    }
}

“User user” 部分で、Kafkaが提供するJsonDeserializerはUser JsonオブジェクトをJava Userオブジェクトで変換する。

再起動すると、このようなメッセージが表示されます。
Postmanでテストします。
うまくいきます。
コマンドでもうまくいきます。


10.リファクタリング:Topic Nameの外部化(ハードコーディング除去)

 application.propertiesに以下の設定を追加します。
ハードコーディングされているコードを改善します。

spring.kafka.topic.name=javaguides
spring.kafka.topic-json.name=javaguides_json

以下のようにコードを書き直します。(既存のコードはコメント処理)

src/main/java/net/javaguides/springbootkafkatutorial/config/KafkaTopicConfig.java

Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.topic.name}")
    private String topicName;

    @Value("${spring.kafka.topic-json.name}")
    private String topicJsonName;

    @Bean
    public NewTopic javaguidesTopic(){
        //return TopicBuilder.name("javaguides")
        return TopicBuilder.name(topicName)
                .build();
    }

    @Bean
    public NewTopic javaguidesJsonTopic(){
        //return TopicBuilder.name("javaguides_json")
        return TopicBuilder.name(topicJsonName)
                .build();
    }
}


src/main/java/net/javaguides/springbootkafkatutorial/kafka/JsonKafkaConsumer.java

@Service
public class JsonKafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaConsumer.class);

    //@KafkaListener(topics = "javaguides_json", groupId = "myGroup")
    @KafkaListener(topics = "${spring.kafka.topic-json.name}", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(User user) {
        LOGGER.info(String.format("Json message recieved -> %s", user.toString()));
    }


src/main/java/net/javaguides/springbootkafkatutorial/kafka/JsonKafkaProducer.java

@Service
public class JsonKafkaProducer {

    @Value("${spring.kafka.topic-json.name}")
    private String topicJsonName;

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaProducer.class);

    private KafkaTemplate<String, User> kafkaTemplate;
    public void sendMessage(User data) {

        Message<User> message = MessageBuilder
                .withPayload(data)
                //.setHeader(KafkaHeaders.TOPIC, "javaguides_json")
                .setHeader(KafkaHeaders.TOPIC, topicJsonName)
                .build();

        kafkaTemplate.send(message);


src/main/java/net/javaguides/springbootkafkatutorial/kafka/KafkaConsumer.java

@Service
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    //@KafkaListener(topics = "javaguides", groupId = "myGroup")
    @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "myGroup")
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));

    }
}


src/main/java/net/javaguides/springbootkafkatutorial/kafka/KafkaProducer.java

@Service
public class KafkaProducer {

    @Value("${spring.kafka.topic.name}")
    private String topicName;


    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    private KafkaTemplate<String, String> kafkaTemplate;
    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent %s", message));
        //kafkaTemplate.send("javaguides", message);
        kafkaTemplate.send(topicName, message);
    }
}


URLに入力
ProducerとConsumerに表示されます。
Postmanでテストします。Json形式で送られるデータです。
Jsonの形式で表示されます。
Producer、Consumer両方に表示されています。
まとめ:
kafka-server、kafka-console-consumer、zookeeper-server、三つのサーバーを起動していたのです。
今日の勉強もコミット・プシュしました。



最後に


今日はApache Kafkaの基本概念と実装過程をまとめました。Kafkaは、大規模な分散システムで安定性と拡張性があり、注目を集めているメッセージブローカープラットフォームです。これを使用することで、データストリーミングおよびイベントベースのアーキテクチャを実装することができます。
Producer、Broker、Consumer、Topic、Partition、Offset、Replica(Leader、Follow)など、いろいろ重要な概念が沢山あるため、勉強が結構必要です。次回はKafkaを利用し、現実のWikiMediaプロジェクトを実践したいと思います!


エンジニアファーストの会社 株式会社CRE-CO
ソンさん


【参考】


  • [Udemy] Building Microservices with Spring Boot & Spring Cloud


この記事が気に入ったらサポートをしてみませんか?