我正在编写一个基于java的kafka消费者应用程序。我正在使用kafka客户端、SpringKafka和SpringBoot作为我的应用程序。虽然springboot让我可以轻松地编写kafka消费者(而不必真正编写concurrentkafkalistenercontainerfactory、consumerfactory等),但我希望能够为这些消费者定义/自定义一些属性。然而,我找不到一个简单的方法来使用SpringBoot。例如:我感兴趣的一些房产是-
ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
我在这里查看了springboot的预定义属性。
另外,基于前面的一个问题,我想在使用者上设置并发性,但是找不到一种配置、application.properties驱动的方法来使用spring boot实现。
一个明显的方法是定义 ConcurrentKafkaListenerContainerFactory, ConsumerFactory
在我的spring上下文中再次上课,并从那里开始工作。我想知道是否有一个更干净的方法来做这件事,特别是因为我用的是springboot。
版本-
Kafka客户端-0.10.0.0-sasl
SpringKafka-1.1.0.0版本
Spring Boot-1.5.10.释放
2条答案
按热度按时间nimxete21#
在您引用的url处,向下滚动至
SpringKafka-1.1.0.0版本
我建议至少升级到1.3.5版本;由于kip-62,它的线程模型简单得多。
编辑
使用boot2.0,您可以设置任意的producer、consumer、admin和common属性,如引导文档中所述。
对于boot 1.5,只有
spring.kafka.properties
如本文所述。这将设置生产者和消费者的属性,但是您可能会在日志中看到一些关于生产者未使用/不支持的属性的杂音。
或者,您可以简单地覆盖boot的使用者工厂,并根据需要添加属性。。。
5uzkadbs2#
我搜索了一下,发现https://github.com/spring-projects/spring-kafka/issues/604. 这个问题已经解决了https://docs.spring.io/spring-boot/docs/2.0.0.release/reference/htmlsingle/#boot-特色Kafka额外道具。但是是针对SpringBoot2.0版的。