在本教程中,我们将学习如何在Spring引导应用程序中使用Apache Kafka。我们将了解如何创建Kafka Producer、Topics、Consumer,以及如何使用Kafka代理在Producer和Consumer之间交换不同的数据格式(String和JSON)。
让我们从什么是Kafka开始?
Apache Kafka是一个开源分布式事件流平台,数千家公司使用它来实现高性能数据管道、流分析、数据集成和任务关键型应用程序。
要了解更多关于Apache Kafka的信息,请查看以下视频:
我们将讨论以下Apache Kafka核心概念:
1.Kafka集群
2.Kafka Broker
3.Kafka Producer
4.Kafka Consumer
5.Kafka Topic
6.Kafka Partitions
7.Kafka Offsets
8.Kafka消费组
要了解更多关于Apache Kafka核心概念的信息,请查看以下视频:
让我们从Kafka集群开始。
由于Kafka是一个分布式系统,因此它充当一个集群。Kafka集群由一组代理组成。一个集群至少有3个代理。
下图显示了带有三个Kafka Brocker的Kafka集群:
代理是Kafka服务器。它只是Kafka服务器的一个有意义的名称。这个名字也很有意义,因为Kafka所做的只是充当生产者和消费者之间的消息代理
生产者和消费者不直接互动。他们使用Kafka服务器作为代理或代理来交换消息。
下图显示了一个Kafka代理,它作为代理或代理在生产者和消费者之间交换消息:
Producer是发送消息的应用程序。它不会直接向收件人发送消息。它只向Kafka服务器发送消息。
下图显示了Producer直接向Kafka代理发送消息:
Consumer是一个从Kafka服务器读取消息的应用程序。
如果生产者发送数据,他们一定是在向某人发送数据,对吗?消费者是接收者。但请记住,生产者不会将数据发送到收件人地址。他们只是把它发送到Kafka服务器
任何对该数据感兴趣的人都可以从Kafka服务器获取该数据。因此,任何从Kafka服务器请求数据的应用程序都是使用者,只要有读取权限,它们就可以请求任何生产者发送数据。
下图显示了生产者直接向Kafka代理发送消息,消费者消费或读取来自Kafka中介的消息:
我们了解到生产商向Kafka Producer发送数据。然后消费者可以向Kafka Producer索要数据。但问题是,哪些数据?我们需要一些标识机制来向代理请求数据。接下来是Kafka Topic。
下图显示了在Kafka代理中创建的两个Topic:
Kafka主题被划分为多个分区,这些分区以不变的顺序包含记录。
Kafka Brokers将存储主题的消息。但是数据的容量可能是巨大的,可能不可能存储在一台计算机中。因此,由于Kafka是一个分布式系统,它将被划分为多个部分并分布在多台计算机之间。
下图显示了Kafka的主题进一步划分为多个分区:
偏移量是消息到达分区时给予消息的ID序列。一旦指定了偏移量,它将永远不会更改。第一条消息获得偏移量零。下一条消息接收偏移量1,依此类推。
消费者组包含一个或多个共同处理消息的消费者。
在本节教程中,我们将学习如何在Spring Boot Kafka项目中创建Kafka Producer和Consumer。
Spring团队提供Spring for Apache Kafka依赖性,以配合开发基于Kafka的消息传递解决方案
在本教程中,我们使用Kafka作为消息传递系统,在生产者和消费者之间发送消息。
1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。
使用https://start.spring.io/创建Spring引导项目
添加依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Import in IntelliJ并运行spring boot应用程序
在application.properties
文件中,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties
文件及其以下内容:
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
让我们了解一下上述由Kafka提供的弹簧靴属性:spring.kafka.consumer.group-id
-指定一个唯一字符串,用于标识此消费者所属的消费者组。spring.kafka.consumer.auto-offset-reset property
-指定当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)要执行的操作:
spring.kafka.consumer.key-deserializer
-指定密钥的反序列化程序类。spring.kafka.consumer.value-deserializer
-指定值的反序列化程序类。spring.kafka.producer.key-deserializer
-指定密钥的序列化程序类。spring.kafka.producer.value-deserializer
-指定值的序列化程序类。
要在启动时创建主题,请添加一个NewTopic
类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig
文件并添加以下内容:
package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
创建一个制作人将写下我们关于这个主题的信息。
好吧,Springboot为Spring的KafkaTemplate
提供了一个自动配置,因此您可以直接在自己的bean中自动连接它。
例如:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){
LOGGER.info(String.format("Message sent -> %s", message));
kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
创建一个e1d11d1包,并在此包中创建包含以下内容的AppConstants
:
package net.javaguides.springbootkafka.utils;
public class AppConstants {
public static final String TOPIC_NAME = "javaguides";
public static final String GROUP_ID = "group_id";
}
KafKaProducer
类使用KafkaTemplate
向配置的主题名称发送消息。
创建控制器包,在控制器包内创建KafkaProducerController
,其中包含以下内容:
package net.javaguides.springbootkafka;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092
确保更改主题名称。在我们的例子中,“javaguides”是主题名。
Kafka消费者 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理
要设置它,请输入以下内容:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
}
在这里,我们告诉我们的方法void to consume(String message)订阅用户的主题,并将每个消息发送到应用程序日志。在实际应用程序中,您可以按照业务要求的方式处理消息。
KafkaListener端点:
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
打开浏览器并点击以下链接以调用REST API:
http://localhost:8080/api/v1/kafka/publish?message=hello%20world
从命令行,可以查看主题消息:
您可以在控制台中查看主题消息:
查看以下教程,了解如何在Spring引导应用程序中使用Apache Kafka在Producer和Consumer之间交换JSON消息。
Spring Boot Kafka JsonSerializer和JsonDeserializer
https://github.com/RameshMF/springboot-kafka-course/tree/main/springboot-kafka-tutorial
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.javaguides.net/2022/06/spring-boot-apache-kafka-tutorial.html
内容来源于网络,如有侵权,请联系作者删除!