如何使用与不同代理关联的多个kafka主题?

ax6ht2ek  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(339)

如何从与不同经纪人相关的多个Kafka主题中消费?
我有一个springboot应用程序,它需要使用两个主题,但是这些主题与不同的代理程序相关联。
我正在使用带有@listener注解的springkafka,我发现有两种方法可以使用与同一个代理相关联的主题,而不是不同的代理。不幸的是,我在springboot或springkafka文档中没有看到任何关于如何做到这一点的帮助。

wz8daaqr

wz8daaqr1#

有几种方法可以做到这一点,不幸的是springboot和springkafka没有明确说明实现这一点的最佳实践。也有很多答案,所以解决消费从多个主题与同一个经纪人,它并不总是那么简单。

方法1

解决此问题的最简单方法是在kafka侦听器注解中添加properties参数:

@KafkaListener(topics = ["\${topic-1-name}"], properties = ["bootstrap.servers=\${bootstrap-server-1}"])
fun topic1Listener(@Payload messages: List<String>, ack: Acknowledgment){
    // Do work
}

@KafkaListener(topics = ["\${topic-2-name}"], properties = ["bootstrap.servers=\${bootstrap-server-2}"])
fun topic2Listener(@Payload messages: List<String>, ack: Acknowledgment){
    // Do work
}

我们在properties param value中指定的任何键/值对都将覆盖defaultkafkaconsumerfactory中的默认键/值。在本例中,我们将bootstrap.servers属性重写为每个主题的特定引导服务器地址。
但是,我们仍然可以使用spring-boot的特性,比如自动创建主题和允许spring-boot为我们的应用程序设置组id。我们只需要在application.properties或application.yml文件中保留group id参数。

spring:
  kafka:
    consumer:
      group-id: group-id-of-your-choice

请注意,我们可以为两个消费者使用相同的组id,即使他们可能跨越多个代理。实际上,为整个应用程序设置一个组id是一种很好的做法,这样就可以很容易地监视使用者延迟以及其他度量。
还要注意的是,我们不再将主题名称存储在spring配置部分,我们需要在其他地方执行此操作,因为我们不希望spring boot使用错误的代理地址配置主题。我们在重写属性时让侦听器处理这部分,如上图所示。
有很多其他的方法来实现这一点,但这是最简单的方式,我发现和测试的工作。

方法2

其他方法包括创建您自己的自定义consumerfactory和kafkalistenercontainerfactory对象,然后配置每个工厂中的属性以使用您选择的引导服务器。但是,第一种方法更简洁,使用默认的容器工厂。下面是如何用自己的属性创建自定义工厂。

@Bean
fun ConsumerFactory1(): DefaultKafkaConsumerFactory<String, String> {
        val props = mutableMapOf<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers1!!
        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        return DefaultKafkaConsumerFactory(props)
}

@Bean
fun ContainerFactory1(): ConcurrentKafkaListenerContainerFactory<String, String>? {
        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = ConsumerFactory1()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.isBatchListener = true
        return factory
}

@Bean
fun ConsumerFactory2(): DefaultKafkaConsumerFactory<Any?, Any?> {
        val props = mutableMapOf<String, Any>()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers2!!
        props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!!
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java

}

@Bean
fun ContainerFactory2(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = ConsumerFactory2()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.isBatchListener = true
        return factory
}

这里有一些东西要打开。
集装箱工厂本质上是一样的,我们只是用它们各自的消费品工厂,每个都有相关的属性。
当我的应用程序将消息作为字符串使用时,我使用了内置的stringdeserializer,然后使用jackson将json字符串序列化为一个对象。应用程序可能需要不同的反序列化程序,甚至需要自定义反序列化程序,具体取决于对主题的数据进行序列化的方式。
将ackmode设置为manual允许我们在确认已使用主题中的消息时进行控制。
将batch listener设置为true允许侦听器分批侦听消息,而不是一次侦听1条消息。
有了这个实现,我们就可以完全从应用程序中剥离spring boot,使用kafka了。所以我们的@listener注解看起来有点不同:

@KafkaListener(topics = ["\${kafka-topic-1}"], containerFactory = "ContainerFactory1", groupId = "\${kafka.group-id}")

我们不再让spring boot为我们配置组id,所以我们现在需要在侦听器中指定它。这意味着在application.properties文件中不再定义spring.kafka.consumer属性,我们需要以编程方式执行此操作。我们现在需要手动配置一些其他的东西,比如在启动时自动配置主题,如果您需要该功能,您需要手动设置kafkaadminbean。

结论

有更多的方法来实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现的成功解决方案中的两个,方法1很容易理解、实现和测试,而不必太过关注springboot和springkafka的深度!如果您需要这一功能,那么这些方法将不仅仅适用于2个代理。

相关问题