在本教程中,我们将学习如何在Spring Boot Kafka项目中创建Kafka Producer和Consumer。
如果您是Apache Kafka的新手,那么您应该看看我的文章——ApacheKafka核心概念。
Spring团队提供Spring for Apache Kafka依赖性,以配合开发基于Kafka的消息传递解决方案
在本教程中,我们使用Kafka作为消息传递系统,在生产者和消费者之间发送消息。
1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。
使用https://start.spring.io/创建Spring引导项目
添加依赖项:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Import in IntelliJ并运行spring boot应用程序
在application.properties
文件中,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties
文件及其以下内容:
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
让我们了解一下上述由Kafka提供的弹簧靴属性:spring.kafka.consumer.group-id
-指定一个唯一字符串,用于标识此消费者所属的消费者组。spring.kafka.consumer.auto-offset-reset property
-指定当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)要执行的操作:
spring.kafka.consumer.key-deserializer
-指定密钥的反序列化程序类。spring.kafka.consumer.value-deserializer
-指定值的反序列化程序类。spring.kafka.producer.key-deserializer
-指定密钥的序列化程序类。spring.kafka.producer.value-deserializer
-指定值的序列化程序类。
要在启动时创建主题,请添加一个NewTopic
类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig
文件并添加以下内容:
package net.javaguides.springbootkafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic javaguidesTopic(){
return TopicBuilder.name("javaguides")
.build();
}
}
创建一个制作人将写下我们关于这个主题的信息。
好吧,Springboot为Spring的KafkaTemplate
提供了一个自动配置,因此您可以直接在自己的bean中自动连接它。
例如:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message){
LOGGER.info(String.format("Message sent -> %s", message));
kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
}
}
创建一个utils包,并在此包中创建包含以下内容的AppConstants
:
package net.javaguides.springbootkafka.utils;
public class AppConstants {
public static final String TOPIC_NAME = "javaguides";
public static final String GROUP_ID = "group_id";
}
KafKaProducer
类使用KafkaTemplate
向配置的主题名称发送消息。
创建控制器包,在控制器包内创建KafkaProducerController
,其中包含以下内容:
package net.javaguides.springbootkafka;
import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {
private KafkaProducer kafkaProducer;
public KafkaProducerController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/publish")
public ResponseEntity<String> publish(@RequestParam("message") String message){
kafkaProducer.sendMessage(message);
return ResponseEntity.ok("Message sent to kafka topic");
}
}
bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092
确保更改主题名称。在我们的例子中,“javaguides”是主题名。
Kafka消费者 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理
要设置它,请输入以下内容:
package net.javaguides.springbootkafka.kafka;
import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafKaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
}
在这里,我们告诉我们的方法void to consume(String message)订阅用户的主题,并将每个消息发送到应用程序日志。在实际应用程序中,您可以按照业务要求的方式处理消息。
KafkaListener端点:
@KafkaListener(topics = AppConstants.TOPIC_NAME,
groupId = AppConstants.GROUP_ID)
public void consume(String message){
LOGGER.info(String.format("Message received -> %s", message));
}
让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
打开浏览器并点击以下链接以调用REST API:
http://localhost:8080/api/v1/kafka/publish?message=hello%20world
从命令行,可以查看主题消息:
您可以在控制台中查看主题消息:
在本教程中,我们学习了如何在Spring Boot Kafka项目中创建Kafka Producer和Consumer,并提供了一个演示实况示例。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.javaguides.net/2022/05/spring-boot-kafka-producer-consumer-example-tutorial.html
内容来源于网络,如有侵权,请联系作者删除!