当我的@kafkalistener听多个主题时,有人知道如何优先考虑单个Kafka主题吗?
下面是我的代码示例:
@KafkaListener(id = "priority", topics = { "${prio-topic}" }, concurrency = "1", autoStartup = "true")
@KafkaListener(id = "nonPriority", topics = { "${not-prio-topic-1}",
"${not-prio-topic-2}", "${not-prio-topic-3}",
"${not-prio-topic-4}", concurrency = "1", autoStartup = "true")
public synchronized void listenManyEntryTopic(String message) {}
我的问题是我想从这个主题开始读 prio-topic
在其他非里约主题之前。只有当我的 prio-topic
是空的我应该开始消费没有任何特定顺序的其他主题。如有任何提示/建议,我们将不胜感激。谢谢你的帮助!
2条答案
按热度按时间bq3bfh9z1#
kafka中没有区分优先级和非优先级主题消息的功能。为了解决您的问题,解决方案之一是将优先级处理主题与非优先级主题分开,即专用app1只使用优先级主题消息并对其进行处理,而app2同时使用非优先级消息并对其进行处理。
bxjv4tth2#
因为您有两个侦听器容器,所以可以添加
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
作为方法的参数。然后,当您收到来自优先级主题的消息时,您可以
stop()
另一个侦听器容器(使用KafkaListenerEndpointRegistry
bean),如果它正在运行。配置
idleEventInterval
在主主题容器中添加@EventListner
方法ListenerContainerIdleEvent
s(或an)ApplicationListener
豆)。然后,当检测到空闲的主容器时,可以重新启动非主容器。
编辑
您也可以使用暂停/继续而不是停止/启动。