我试图让我的消费者动态更新其消费。
让我给你举一个更具体的例子,用动物。假设我有一家宠物店,每个主题都是一种动物(例如狗、猫、鱼)。我的kafka消费者的主要职责是获取kafka中的任何日志/记录/消息,并将它们存储到数据库中。
假设我的消费者正在积极消费 dogs
以及 cats
主题和一切工作正常,现在有一种新的动物进入商店和一个新的主题是在Kafka集群生成。如何通知消费者已添加新主题?
我有两个建议,我想看看你认为哪一个更好?或者如果有更好的第三种选择,请告诉我。
1.)生产者向消费者发送一个http请求,让消费者知道生产者将要创建一个新主题,以便消费者可以相应地采取行动。这种方法的问题是,存在竞争条件。消费者有可能在主题创建之前就尝试消费(我发现如果我有 auto.topic.creation.enable
如果设置为true,则竞赛条件实际上不是问题。)
2.)创建一个名为 topic_updates
在Kafka星系群里。因此,每当制作者成功地向Kafka集群提交了一条消息时,它就会通过这个集群广播新闻 topic_updates
,也许一个简单的字符串就可以了。消费者正在积极收听此主题的更新。
3.)我不知道,理想情况下,我希望Kafka在创建新主题时能够发出一个事件。
先谢谢你
2条答案
按热度按时间bxgwgixi1#
您可以使用新的kafkaadminclient,以某种方式监视主题列表并检查新添加的内容。下面是一个示例代码,它为您提供了主题列表(不包括内部主题):
oaxa6hgo2#
消费者能够自动找到新创建的主题,您可以通过调用
consumer.subscribe(Pattern.compile(".*"));
可以降下来metadata.max.age.ms
让消费者更快地了解新主题。