如何使用spring boot设置kafka使用者并发性

kr98yfug  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(366)

我正在编写一个基于java的kafka消费者应用程序。我正在使用kafka客户端、SpringKafka和SpringBoot作为我的应用程序。虽然springboot让我可以轻松地编写kafka消费者(而不必真正编写concurrentkafkalistenercontainerfactory、consumerfactory等),但我希望能够为这些消费者定义/自定义一些属性。然而,我找不到一个简单的方法来使用SpringBoot。例如:我感兴趣的一些房产是-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG 我在这里查看了springboot的预定义属性。
另外,基于前面的一个问题,我想在使用者上设置并发性,但是找不到一种配置、application.properties驱动的方法来使用spring boot实现。
一个明显的方法是定义 ConcurrentKafkaListenerContainerFactory, ConsumerFactory 在我的spring上下文中再次上课,并从那里开始工作。我想知道是否有一个更干净的方法来做这件事,特别是因为我用的是springboot。
版本-
Kafka客户端-0.10.0.0-sasl
SpringKafka-1.1.0.0版本
Spring Boot-1.5.10.释放

nimxete2

nimxete21#

在您引用的url处,向下滚动至

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

SpringKafka-1.1.0.0版本
我建议至少升级到1.3.5版本;由于kip-62,它的线程模型简单得多。
编辑
使用boot2.0,您可以设置任意的producer、consumer、admin和common属性,如引导文档中所述。

spring.kafka.consumer.properties.heartbeat.interval.ms

对于boot 1.5,只有 spring.kafka.properties 如本文所述。
这将设置生产者和消费者的属性,但是您可能会在日志中看到一些关于生产者未使用/不支持的属性的杂音。
或者,您可以简单地覆盖boot的使用者工厂,并根据需要添加属性。。。

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
    Map<String, Object> consumerProps = properties.buildConsumerProperties();
    consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
    return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}
5uzkadbs

5uzkadbs2#

我搜索了一下,发现https://github.com/spring-projects/spring-kafka/issues/604. 这个问题已经解决了https://docs.spring.io/spring-boot/docs/2.0.0.release/reference/htmlsingle/#boot-特色Kafka额外道具。但是是针对SpringBoot2.0版的。

相关问题