我正在使用Kafka消息代理来发布和订阅事件。为此使用Spring基础设施。我的要求是我需要创建一个订阅多个主题的消费者。
下面的代码在订阅单个主题时工作得非常好。
@KafkaListener(topics = "com.customer.nike")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
但是我想,它应该是订阅一些模式的主题。
@KafkaListener(topics = "com.cusotmer.*.nike")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
在这段代码中,* 会不断变化。它可能是一些数字值,如1000、1010等等。为此,我也使用了SpeL。
@KafkaListener(topics = "#{com.cusotmer.*.nike}")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
但是这个也不适合我。有人能帮我订阅多个主题吗?
先谢谢你。
3条答案
按热度按时间9lowa7mx1#
我使用
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}"
,其中kafka.topics
取自我的属性文件,并包含侦听器应该侦听的逗号分隔的主题。但也可以是,在启动期间,u可以添加逻辑以生成所有可能的主题,并将其分配给变量,该变量稍后可以如上所述地使用。
更新:通配符是可能的,因为 Alexandria 评论如下。
c3frrgcw2#
关于订阅多个主题,您可以使用topicPatterns来实现:
此监听器的主题模式。项目可以是'topic pattern'、'property-placeholder key'或'expression'。框架将创建一个容器,该容器订阅与指定模式匹配的所有主题,以获取动态分配的分区。将针对检查时存在的主题定期执行模式匹配。表达式必须解析为主题模式(支持字符串或模式结果类型)。
与topics()和topicPartitions()互斥。
关于对the topic name的编程访问,您可以使用
@Header
注解方法来提取由KafkaHeaders定义的特定头值,在您的示例中为RECEIVED_TOPIC:包含从中接收消息的主题的标头。
gorkyyrv3#
如果我们必须从
application.properties
文件中获取多个主题: