【大量データを処理する分散メッセージキュー】Apache Kafkaの基本的実装 -Springboot3 第13回
はじめに
こんにちは、今日は大規模のシステムで使われるApach Kafkaについて勉強します!Kafkaは大量データを処理する分散メッセージキュー・システムです。
Kafkaを簡単に紹介する動画を見ましょう。
(英語)
Apache Kafkaの概要
アパッチ・カフカは、分散型のリアルタイムデータストリーム処理システムで、以下のような構造を持っています
プロデューサー(Producer): データを生成し、それをカフカに送信する役割を果たします。例えば、ログデータやセンサーデータを生成するシステムがプロデューサーとなります。
ブローカー(Broker): カフカのサーバーで、メッセージを保存し管理します。複数のブローカーがクラスタを構成し、データの分散処理をサポートします。
トピック(Topic): データのカテゴリを意味し、メッセージが属するテーマです。例えば、「ログ」、「イベント」などがトピックとなります。
パーティション(Partition): 各トピックは1つ以上のパーティションに分割されます。パーティションはデータの物理的な分離を意味し、並列処理と安定性を提供します。
オフセット(Offset):メッセージストリーム内の各メッセージを識別するための概念です。各メッセージには、オフセットと呼ばれる固有の識別子が割り当てられています。これはメッセージが属するパーティション内での位置を示します。
ズーキーパー(Zookeeper): カフカクラスタの構成やリーダーの選出などを管理する分散コーディネータです。
コンシューマー(Consumer): データを消費し、処理する役割を果たします。複数のコンシューマーが同時に1つのトピックからメッセージを消費できます。
要約すると、プロデューサーがデータを生成して特定のトピックに送信します。
ブローカーがそのデータを管理し、トピックのパーティションに分配します。
その後、コンシューマーは特定のトピックのパーティションからデータを消費して処理します。
Apache Kafkaの実装過程
1.Kafkaのインストールとプロジェクト生成
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --topic topic-example --bootstrap-server localhost:9092
上記のコマンドで「topic」を作りましょう。
--create: トピックを作成するオプションです。
--topic topic-example: 作成するトピックの名前を指定します。
--bootstrap-server localhost:9092: Kafkaクラスタへの接続情報を指定しま。
.\bin\windows\kafka-console-producer.bat --topic topic-example --bootstrap-server localhost:9092
このコマンドは、Kafkaのコンソールプロデューサーを起動するためのものです。指定されたトピック(topic-example)およびブートストラップサーバー(localhost:9092)にメッセージを送信します。
.\bin\windows\kafka-console-consumer.bat --topic topic-example --from-beginning --bootstrap-server localhost:9092
いままでのコマンドをおさらいします!
STEP 1: DOWNLOAD AND INSTALL KAFKA
https://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
STEP 2: START THE KAFKA ENVIRONMENT
# Start the ZooKeeper service
C:\Users\***\Downloads\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
# Start the Kafka broker service
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties
STEP 3: CREATE A TOPIC TO STORE YOUR EVENTS
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-topics.bat --create --topic topic_demo --bootstrap-server localhost:9092
STEP 4: WRITE SOME EVENTS INTO THE TOPIC
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-console-producer.bat --topic topic_demo --bootstrap-server localhost:9092
>hello world
>topic demo
STEP 5: READ THE EVENTS
C:\Users\***\Downloads\kafka>.\bin\windows\kafka-console-consumer.bat --topic topic_demo --from-beginning --bootstrap-server localhost:9092
hello world
topic demo
2.ProducerとConsumerの設定
Kafkaの公式ドキュメントを確認します。
application.propertiesに以下の設定をします。
spring.kafka.consumer.bootstrap-severs: logcalhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer:org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value -serializer:org.apache.kafka.common.serialization.StringSerializer
コンシューマーの設定
spring.kafka.consumer.bootstrap-servers: logcalhost:9092
コンシューマが接続するKafkaブローカーのアドレスを指定します。ここではlocalhostのポート9092にあるブローカーに接続します。
spring.kafka.consumer.group-id: myGroup
コンシューマのグループIDを設定します。グループIDは処理を並列化し、複数のコンシューマ間でタスクを分散するために使用されます。
spring.kafka.consumer.auto-offset-reset: earliest
初期オフセットがないか、または現在のオフセットがサーバー上に存在しない場合の動作を決定します。ここでは最初のオフセットからデータを読むように設定されています。
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Kafkaメッセージのキーを逆シリアライズするためのクラスを指定します。ここではキーを文字列として逆シリアライズします。
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Kafkaメッセージの値を逆シリアライズするためのクラスを指定します。値を文字列として逆シリアライズします。
プロデューサの設定
spring.kafka.producer.bootstrap-servers: localhost:9092
プロデューサが接続するKafkaブローカーのアドレスを指定します。
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringDeserializer
Kafkaメッセージのキーをシリアライズするためのクラスを指定します。ここではキーを文字列としてシリアライズします。
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringDeserializer
Kafkaメッセージの値をシリアライズするためのクラスを指定します。ここでは値を文字列としてシリアライズします。
3.Topic・Producer・SendMessageController生成
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message){
LOGGER.info(String.format("Message sent %s", message));
kafkaTemplate.send("javaguides", message);
}
}
private static final Logger LOGGER: ロギング用のLoggerオブジェクトを初期化しています。これは、メッセージが送信されたことをログに記録します。
private KafkaTemplate<String, String> kafkaTemplate: Kafkaメッセージを送信するために使用される KafkaTemplate オブジェクトを保持しています。
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate): コンストラクタです。KafkaTemplate オブジェクトを受け取り、インスタンス変数に設定します。
public void sendMessage(String message): メッセージを送信するためのメソッドです。メソッドは引数としてメッセージを受け取り、それをKafkaのトピックに送信します。
LOGGER.info(String.format("Message sent %s", message));: メッセージが正常に送信されたことをログに記録しています。
kafkaTemplate.send("javaguides", message);: kafkaTemplate を使用して、指定されたトピック("javaguides")にメッセージを送信しています。
@RestController
@RequestMapping("/api/v1/kafka")
public class MessageController {
private KafkaProducer kafkaProducer;
@Autowired
public MessageController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
// http:localhost:8080/api/v1/kafka/publish?message=hello world
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to the topic");
}
}
bin\windows\kafka-console-consumer.bat --topic javaguides --from-beginning --bootstrap-server localhost:9092
4.Consumerの生成
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "javaguides", groupId = "myGroup")
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
}
@KafkaListener(topics = "javaguides", groupId = "myGroup"): @KafkaListener アノテーションを使用して、指定されたトピック("javaguides")およびグループID("myGroup")でメッセージを受信するメソッドを指定しています。
5.Producer・ConsumerのJSON SerializerとDeserializer設定
application.properties
spring.kafka.consumer.bootstrap-severs: logcalhost:9092
spring.kafka.consumer.group-id: myGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer:org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer:org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
コンシューマーの値(メッセージの本文)のデシリアライザを指定しています。
ここでは、JsonDeserializerを使用してJSON形式のメッセージをデシリアライズするように設定されています。
spring.kafka.consumer.properties.spring.json.trusted.packages=*
JsonDeserializerの設定で、受信したJSONメッセージをデシリアライズする際に信頼されるJavaパッケージを指定しています。
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
プロデューサーの値(メッセージの本文)のシリアライザを指定しています。
ここでは、JsonSerializerを使用してJavaオブジェクトをJSON形式に変換するように設定されています。
6.Simple POJOをSerialize/Deserializeへ
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
toString() メソッド
オブジェクトの文字列表現を提供するために toString() メソッドがオーバーライドされています。これにより、オブジェクトを文字列に変換してログなどに表示する際に便利です。
7.JSON Messageを生成するProducer作成
@Service
public class JsonKafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaProducer.class);
private KafkaTemplate<String, User> kafkaTemplate;
public JsonKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(User data) {
LOGGER.info(String.format("Message sent -> %s", data.toString()));
Message<User> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, "javaguides")
.build();
kafkaTemplate.send(message);
}
}
public void sendMessage(User data): JSON形式のメッセージを送信するためのメソッドです。
LOGGER.info(String.format("Message sent -> %s", data.toString()));: メッセージが正常に送信されたことをログに記録しています。
Message<User> message = MessageBuilder...: MessageBuilder を使用して、User オブジェクトをペイロードとして持つKafkaメッセージを構築しています。
kafkaTemplate.send(message);: kafkaTemplate を使用して、構築したメッセージをKafkaに送信しています。
8.JSONオブジェクトを送るAPI作成
@RestController
@RequestMapping("/api/v1/kafka")
public class JsonMessageController {
private JsonKafkaProducer kafkaProducer;
public JsonMessageController(JsonKafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user) {
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Json Message sent to kafka topic");
}
}
この JsonMessageController クラスは、JSON形式のメッセージをKafkaに送信するためのコントローラーです。
HTTP POSTリクエストで受け取ったJSON形式のユーザーオブジェクトを JsonKafkaProducer を使用してKafkaに送信します。メッセージの送信が成功した場合、ResponseEntity を使用してHTTPステータス200(OK)とメッセージを返します。
bin\windows\kafka-console-consumer.bat --topic javaguides_json --from-beginning --bootstrap-server localhost:9092
9.JSON Messageを消費するConsumer作成
@Service
public class JsonKafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaConsumer.class);
@KafkaListener(topics = "javaguides_json", groupId = "myGroup")
public void consume(User user) {
LOGGER.info(String.format("Json message recieved -> %s", user.toString()));
}
}
“User user” 部分で、Kafkaが提供するJsonDeserializerはUser JsonオブジェクトをJava Userオブジェクトで変換する。
10.リファクタリング:Topic Nameの外部化(ハードコーディング除去)
application.propertiesに以下の設定を追加します。
ハードコーディングされているコードを改善します。
spring.kafka.topic.name=javaguides
spring.kafka.topic-json.name=javaguides_json
以下のようにコードを書き直します。(既存のコードはコメント処理)
src/main/java/net/javaguides/springbootkafkatutorial/config/KafkaTopicConfig.java
Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.topic.name}")
private String topicName;
@Value("${spring.kafka.topic-json.name}")
private String topicJsonName;
@Bean
public NewTopic javaguidesTopic(){
//return TopicBuilder.name("javaguides")
return TopicBuilder.name(topicName)
.build();
}
@Bean
public NewTopic javaguidesJsonTopic(){
//return TopicBuilder.name("javaguides_json")
return TopicBuilder.name(topicJsonName)
.build();
}
}
src/main/java/net/javaguides/springbootkafkatutorial/kafka/JsonKafkaConsumer.java
@Service
public class JsonKafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaConsumer.class);
//@KafkaListener(topics = "javaguides_json", groupId = "myGroup")
@KafkaListener(topics = "${spring.kafka.topic-json.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(User user) {
LOGGER.info(String.format("Json message recieved -> %s", user.toString()));
}
src/main/java/net/javaguides/springbootkafkatutorial/kafka/JsonKafkaProducer.java
@Service
public class JsonKafkaProducer {
@Value("${spring.kafka.topic-json.name}")
private String topicJsonName;
private static final Logger LOGGER = LoggerFactory.getLogger(JsonKafkaProducer.class);
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMessage(User data) {
Message<User> message = MessageBuilder
.withPayload(data)
//.setHeader(KafkaHeaders.TOPIC, "javaguides_json")
.setHeader(KafkaHeaders.TOPIC, topicJsonName)
.build();
kafkaTemplate.send(message);
src/main/java/net/javaguides/springbootkafkatutorial/kafka/KafkaConsumer.java
@Service
public class KafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
//@KafkaListener(topics = "javaguides", groupId = "myGroup")
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "myGroup")
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
}
src/main/java/net/javaguides/springbootkafkatutorial/kafka/KafkaProducer.java
@Service
public class KafkaProducer {
@Value("${spring.kafka.topic.name}")
private String topicName;
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
public void sendMessage(String message){
LOGGER.info(String.format("Message sent %s", message));
//kafkaTemplate.send("javaguides", message);
kafkaTemplate.send(topicName, message);
}
}
最後に
今日はApache Kafkaの基本概念と実装過程をまとめました。Kafkaは、大規模な分散システムで安定性と拡張性があり、注目を集めているメッセージブローカープラットフォームです。これを使用することで、データストリーミングおよびイベントベースのアーキテクチャを実装することができます。
Producer、Broker、Consumer、Topic、Partition、Offset、Replica(Leader、Follow)など、いろいろ重要な概念が沢山あるため、勉強が結構必要です。次回はKafkaを利用し、現実のWikiMediaプロジェクトを実践したいと思います!
エンジニアファーストの会社 株式会社CRE-CO
ソンさん
【参考】
[Udemy] Building Microservices with Spring Boot & Spring Cloud
この記事が気に入ったらサポートをしてみませんか?