Spring Boot Kafka生产者消费者示例教程

x33g5p2x  于2022-10-12 转载在 Spring  
字(5.7k)|赞(0)|评价(0)|浏览(932)

在本教程中,我们将学习如何在Spring Boot Kafka项目中创建Kafka ProducerConsumer
如果您是Apache Kafka的新手,那么您应该看看我的文章——ApacheKafka核心概念。
Spring团队提供Spring for Apache Kafka依赖性,以配合开发基于Kafka的消息传递解决方案
在本教程中,我们使用Kafka作为消息传递系统,在生产者和消费者之间发送消息。

1.安装和设置Apache Kafka

1.从官方网站https://kafka.apache.org/downloads下载Kafka
2.在本地文件系统中解压缩Kafka zip
运行以下命令,以正确的顺序启动所有服务:
3.启动Zookeeper服务
使用以下命令启动Zookeeper服务:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4.启动Kafka Broker
打开另一个终端会话并运行以下命令以启动Kafka代理:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务成功启动,您将有一个基本的Kafka环境正在运行并准备好使用。

2.在IntelliJ中创建和设置Spring Boot项目

使用https://start.spring.io/创建Spring引导项目
添加依赖项:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
</dependency>
   
 <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>

Import in IntelliJ并运行spring boot应用程序

3.在应用程序中配置Kafka Producer和Consumer。属性文件

application.properties文件中,添加Kafka代理地址以及与消费者和生产者相关的配置。
打开application.properties文件及其以下内容:

spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: group-id
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

让我们了解一下上述由Kafka提供的弹簧靴属性:
spring.kafka.consumer.group-id-指定一个唯一字符串,用于标识此消费者所属的消费者组。
spring.kafka.consumer.auto-offset-reset property-指定当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如,因为该数据已被删除)要执行的操作:

  • earliest:自动将偏移重置为最早偏移
  • latest:自动将偏移重置为最新偏移
  • none:如果没有为消费者组找到以前的偏移量,则向消费者抛出异常
  • anything else::向消费者抛出一个异常。

spring.kafka.consumer.key-deserializer-指定密钥的反序列化程序类。
spring.kafka.consumer.value-deserializer-指定值的反序列化程序类。
spring.kafka.producer.key-deserializer-指定密钥的序列化程序类。
spring.kafka.producer.value-deserializer-指定值的序列化程序类。

4.创建Kafka主题

要在启动时创建主题,请添加一个NewTopic类型的bean。如果主题已经存在,则忽略该bean。在这个例子中,我们将使用主题名“javaguides”。
让我们创建一个KafkaTopicConfig文件并添加以下内容:

package net.javaguides.springbootkafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig {

    @Bean
    public NewTopic javaguidesTopic(){
        return TopicBuilder.name("javaguides")
                .build();
    }
}

5.创建Kafka制作人

创建一个制作人将写下我们关于这个主题的信息。

Kafka模板

好吧,Springboot为Spring的KafkaTemplate提供了一个自动配置,因此您可以直接在自己的bean中自动连接它。
例如:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message){
        LOGGER.info(String.format("Message sent -> %s", message));
        kafkaTemplate.send(AppConstants.TOPIC_NAME, message);
    }
}

创建一个utils包,并在此包中创建包含以下内容的AppConstants

package net.javaguides.springbootkafka.utils;

public class AppConstants {
    public static final String TOPIC_NAME = "javaguides";
    public static final String GROUP_ID = "group_id";
}

KafKaProducer类使用KafkaTemplate向配置的主题名称发送消息。

6.创建REST API以发送消息

创建控制器包,在控制器包内创建KafkaProducerController,其中包含以下内容:

package net.javaguides.springbootkafka;

import net.javaguides.springbootkafka.kafka.KafkaProducer;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaProducerController {

    private KafkaProducer kafkaProducer;

    public KafkaProducerController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @GetMapping("/publish")
    public ResponseEntity<String> publish(@RequestParam("message") String message){
        kafkaProducer.sendMessage(message);
        return ResponseEntity.ok("Message sent to kafka topic");
    }
}

通过命令行查看主题消息:

bin/kafka-console-consumer.sh --topic javaguides --from-beginning --bootstrap-server localhost:9092

确保更改主题名称。在我们的例子中,“javaguides”是主题名。

7.创建Kafka消费者

Kafka消费者 该服务将负责根据您自己的业务逻辑的需要读取消息并进行处理
要设置它,请输入以下内容:

package net.javaguides.springbootkafka.kafka;

import net.javaguides.springbootkafka.utils.AppConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafKaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafKaConsumer.class);

    @KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }
}

在这里,我们告诉我们的方法void to consume(String message)订阅用户的主题,并将每个消息发送到应用程序日志。在实际应用程序中,您可以按照业务要求的方式处理消息。
KafkaListener端点:

@KafkaListener(topics = AppConstants.TOPIC_NAME,
                    groupId = AppConstants.GROUP_ID)
    public void consume(String message){
        LOGGER.info(String.format("Message received -> %s", message));
    }

8.演示

让我们运行Spring引导应用程序并进行演示。确保Zookeeper和Kafka服务已经启动并运行。
打开浏览器并点击以下链接以调用REST API:
http://localhost:8080/api/v1/kafka/publish?message=hello%20world


从命令行,可以查看主题消息:

您可以在控制台中查看主题消息:

结论

在本教程中,我们学习了如何在Spring Boot Kafka项目中创建Kafka ProducerConsumer,并提供了一个演示实况示例。

相关文章