如何从与不同经纪人相关的多个Kafka主题中消费?我有一个springboot应用程序,它需要使用两个主题,但是这些主题与不同的代理程序相关联。我正在使用带有@listener注解的springkafka,我发现有两种方法可以使用与同一个代理相关联的主题,而不是不同的代理。不幸的是,我在springboot或springkafka文档中没有看到任何关于如何做到这一点的帮助。
wz8daaqr1#
有几种方法可以做到这一点,不幸的是springboot和springkafka没有明确说明实现这一点的最佳实践。也有很多答案,所以解决消费从多个主题与同一个经纪人,它并不总是那么简单。
解决此问题的最简单方法是在kafka侦听器注解中添加properties参数:
@KafkaListener(topics = ["\${topic-1-name}"], properties = ["bootstrap.servers=\${bootstrap-server-1}"]) fun topic1Listener(@Payload messages: List<String>, ack: Acknowledgment){ // Do work } @KafkaListener(topics = ["\${topic-2-name}"], properties = ["bootstrap.servers=\${bootstrap-server-2}"]) fun topic2Listener(@Payload messages: List<String>, ack: Acknowledgment){ // Do work }
我们在properties param value中指定的任何键/值对都将覆盖defaultkafkaconsumerfactory中的默认键/值。在本例中,我们将bootstrap.servers属性重写为每个主题的特定引导服务器地址。但是,我们仍然可以使用spring-boot的特性,比如自动创建主题和允许spring-boot为我们的应用程序设置组id。我们只需要在application.properties或application.yml文件中保留group id参数。
spring: kafka: consumer: group-id: group-id-of-your-choice
请注意,我们可以为两个消费者使用相同的组id,即使他们可能跨越多个代理。实际上,为整个应用程序设置一个组id是一种很好的做法,这样就可以很容易地监视使用者延迟以及其他度量。还要注意的是,我们不再将主题名称存储在spring配置部分,我们需要在其他地方执行此操作,因为我们不希望spring boot使用错误的代理地址配置主题。我们在重写属性时让侦听器处理这部分,如上图所示。有很多其他的方法来实现这一点,但这是最简单的方式,我发现和测试的工作。
其他方法包括创建您自己的自定义consumerfactory和kafkalistenercontainerfactory对象,然后配置每个工厂中的属性以使用您选择的引导服务器。但是,第一种方法更简洁,使用默认的容器工厂。下面是如何用自己的属性创建自定义工厂。
@Bean fun ConsumerFactory1(): DefaultKafkaConsumerFactory<String, String> { val props = mutableMapOf<String, Any>() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers1!! props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!! props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java return DefaultKafkaConsumerFactory(props) } @Bean fun ContainerFactory1(): ConcurrentKafkaListenerContainerFactory<String, String>? { val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = ConsumerFactory1() factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL factory.isBatchListener = true return factory } @Bean fun ConsumerFactory2(): DefaultKafkaConsumerFactory<Any?, Any?> { val props = mutableMapOf<String, Any>() props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootStrapServers2!! props[ConsumerConfig.GROUP_ID_CONFIG] = groupId!! props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java } @Bean fun ContainerFactory2(): ConcurrentKafkaListenerContainerFactory<String, String> { val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory() factory.consumerFactory = ConsumerFactory2() factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL factory.isBatchListener = true return factory }
这里有一些东西要打开。集装箱工厂本质上是一样的,我们只是用它们各自的消费品工厂,每个都有相关的属性。当我的应用程序将消息作为字符串使用时,我使用了内置的stringdeserializer,然后使用jackson将json字符串序列化为一个对象。应用程序可能需要不同的反序列化程序,甚至需要自定义反序列化程序,具体取决于对主题的数据进行序列化的方式。将ackmode设置为manual允许我们在确认已使用主题中的消息时进行控制。将batch listener设置为true允许侦听器分批侦听消息,而不是一次侦听1条消息。有了这个实现,我们就可以完全从应用程序中剥离spring boot,使用kafka了。所以我们的@listener注解看起来有点不同:
@KafkaListener(topics = ["\${kafka-topic-1}"], containerFactory = "ContainerFactory1", groupId = "\${kafka.group-id}")
我们不再让spring boot为我们配置组id,所以我们现在需要在侦听器中指定它。这意味着在application.properties文件中不再定义spring.kafka.consumer属性,我们需要以编程方式执行此操作。我们现在需要手动配置一些其他的东西,比如在启动时自动配置主题,如果您需要该功能,您需要手动设置kafkaadminbean。
有更多的方法来实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现的成功解决方案中的两个,方法1很容易理解、实现和测试,而不必太过关注springboot和springkafka的深度!如果您需要这一功能,那么这些方法将不仅仅适用于2个代理。
1条答案
按热度按时间wz8daaqr1#
有几种方法可以做到这一点,不幸的是springboot和springkafka没有明确说明实现这一点的最佳实践。也有很多答案,所以解决消费从多个主题与同一个经纪人,它并不总是那么简单。
方法1
解决此问题的最简单方法是在kafka侦听器注解中添加properties参数:
我们在properties param value中指定的任何键/值对都将覆盖defaultkafkaconsumerfactory中的默认键/值。在本例中,我们将bootstrap.servers属性重写为每个主题的特定引导服务器地址。
但是,我们仍然可以使用spring-boot的特性,比如自动创建主题和允许spring-boot为我们的应用程序设置组id。我们只需要在application.properties或application.yml文件中保留group id参数。
请注意,我们可以为两个消费者使用相同的组id,即使他们可能跨越多个代理。实际上,为整个应用程序设置一个组id是一种很好的做法,这样就可以很容易地监视使用者延迟以及其他度量。
还要注意的是,我们不再将主题名称存储在spring配置部分,我们需要在其他地方执行此操作,因为我们不希望spring boot使用错误的代理地址配置主题。我们在重写属性时让侦听器处理这部分,如上图所示。
有很多其他的方法来实现这一点,但这是最简单的方式,我发现和测试的工作。
方法2
其他方法包括创建您自己的自定义consumerfactory和kafkalistenercontainerfactory对象,然后配置每个工厂中的属性以使用您选择的引导服务器。但是,第一种方法更简洁,使用默认的容器工厂。下面是如何用自己的属性创建自定义工厂。
这里有一些东西要打开。
集装箱工厂本质上是一样的,我们只是用它们各自的消费品工厂,每个都有相关的属性。
当我的应用程序将消息作为字符串使用时,我使用了内置的stringdeserializer,然后使用jackson将json字符串序列化为一个对象。应用程序可能需要不同的反序列化程序,甚至需要自定义反序列化程序,具体取决于对主题的数据进行序列化的方式。
将ackmode设置为manual允许我们在确认已使用主题中的消息时进行控制。
将batch listener设置为true允许侦听器分批侦听消息,而不是一次侦听1条消息。
有了这个实现,我们就可以完全从应用程序中剥离spring boot,使用kafka了。所以我们的@listener注解看起来有点不同:
我们不再让spring boot为我们配置组id,所以我们现在需要在侦听器中指定它。这意味着在application.properties文件中不再定义spring.kafka.consumer属性,我们需要以编程方式执行此操作。我们现在需要手动配置一些其他的东西,比如在启动时自动配置主题,如果您需要该功能,您需要手动设置kafkaadminbean。
结论
有更多的方法来实现这一点,我知道其他人也提出了很好的解决方案,有时这完全取决于您的应用程序需要什么。这只是我发现的成功解决方案中的两个,方法1很容易理解、实现和测试,而不必太过关注springboot和springkafka的深度!如果您需要这一功能,那么这些方法将不仅仅适用于2个代理。