在前面的教程中,我们已经了解了如何在Spring引导项目中在Kafka Producer和Kafka Consumer之间交换String格式消息。
在本教程中,我们将学习如何使用Spring Kafka库提供的JsonSerializer和JsonDeserializer类来存储和检索Apache Kafka主题中的JSON并返回Java模型对象。
如果您是Apache Kafka的新手,那么您应该看看我的文章——ApacheKafka核心概念。
基本上,您将学习如何以JSON字节[]的形式向Apache Kafka发送和接收Java对象。
Apache Kafka存储和传输字节[]。有许多内置的序列化器和反序列化器,但不包括任何针对JSON的序列化器。Spring Kafka创建了一个JsonSerializer和JsonDeserializer,我们可以使用它们将Java对象转换为JSON或从JSON转换为Java对象。
我们将使用JsonSerializer将Java对象作为JSON字节[]发送到Kafka主题。之后,我们将配置如何接收JSON字节[],并使用JsonDeserializer将其自动转换为Java对象。
1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。
使用https://start.spring.io/创建Spring引导项目
添加依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Import in IntelliJ并运行spring boot应用程序
在应用程序中。属性文件,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties
文件,让我们配置Kafka Producer和Consumer以交换JSON消息:
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
我们正在使用以下Consumer属性将JSON转换为Java对象:
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
我们正在使用以下Producer属性将Java对象转换为JSON:
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
让我们了解一下上述属性的含义。spring.kafka.consumer.bootstrap-servers
—用于建立到Kafka集群的初始连接的主机:端口对的逗号分隔列表。覆盖消费者的全局属性。spring.kafka.consumer.group-id
-标识此消费者所属的消费者组的唯一字符串。spring.kafka.consumer.auto-offset-reset
-当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时,该怎么办。spring.kafka.consumer.key-deserializer
-密钥的反序列化程序类。spring.kafka.consumer.value-deserializer
-值的反序列化程序类。spring.kafka.producer.key-serializer
-密钥的序列化程序类。spring.kafka.producer.value-serializer
-值的序列化程序类
要在启动时创建主题,请添加NewTopic类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig文件并添加以下内容:
package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
让我们创建一个User
类来向Kafka主题发送和接收User对象。
那么,User
实例将被JsonSerializer
序列化为一个字节数组。Kafka最终将这个字节数组存储到特定主题的给定分区中。
在反序列化期间,JsonDeserializer
用于从Kafka接收JSON作为字节数组,将其转换为JSON字节数组到User对象,并将其返回给应用程序。
package net.javaguides.springbootkafka.payload;
public class User {
private int id;
private String firstName;
private String lastName;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", firstName='" + firstName + '\'' +
", lastName='" + lastName + '\'' +
'}';
}
}
让我们创建Kafka Producer以使用Spring Kafka生成JSON消息。
好吧,Spring引导为Spring的KafkaTemplate提供了一个自动配置,所以您可以直接在自己的bean中自动连接它。
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
public void sendMessage(User data){
LOGGER.info(String.format("Message sent -> %s", data.toString()));
Message<User> message = MessageBuilder
.withPayload(data)
.setHeader(KafkaHeaders.TOPIC, AppConstants.TOPIC_NAME)
.build();
kafkaTemplate.send(message);
}
}
让我们从向Kafka Topic发送User对象开始。
**注意:**我们创建了一个KafkaTemplate<String, User>
,因为我们将Java对象发送到Kafka主题,该主题将自动转换为JSON字节[]。
在本例中,我们使用1d14d1e创建了ae1d13d。添加我们要发送消息的主题也很重要。
让我们创建一个简单的POST REST API,将用户信息作为JSON对象发送。
我们将创建一个POST REST API来将完整的User对象作为JSON发布,而不是发送消息字符串,这样Kafka生产者就可以将User对象写入Kafka主题。
package net.javaguides.springbootkafka.controller;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import net.javaguides.springbootkafka.payload.User;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody User user){
kafkaProducer.sendMessage(user);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
让我们创建一个Kafka消费者来接收来自主题的JSON消息。在KafkaConsumer中,我们只需要在方法中添加User Java Object作为参数。
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.payload.User;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(User data){
LOGGER.info(String.format("Message received -> %s", data.toString()));
}
}
让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
让我们使用Postman客户端进行POST REST API调用:
观察控制台日志:
在本教程中,我们学习了如何使用Spring Kafka库提供的JsonSerializer和JsonDeserializer类来存储和检索Apache Kafka主题中的JSON以及返回Java模型对象。
https://github.com/RameshMF/springboot-kafka-course/tree/main/springboot-kafka-tutorial
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
内容来源于网络,如有侵权,请联系作者删除!