quarkus与kafka的React式消息传递

yqyhoc1h  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(529)

关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。

六个月前关门了。
改进这个问题
我有两个微服务,一个生产者和一个消费者。制作人每两秒钟向Kafka主题写入一个递增的数字。使用者有两个示例正在运行,使用这些增量。我注意到一些我想解决的奇怪的事情:
当还没有消费者,生产者开始生产的消息被保存在Kafka。当消费者联机时,它不会处理生产者已经生成的现有消息,而是开始消费现在传入的消息。消费者如何处理所有未消费的消息?
当有两个消费者时,我希望两个消费者都能平等消费。现在只有一个消费者得到所有的负荷,而另一个只是坐在那里。如何将负载分散到消费者的数量上?
看起来Kafka保存了所有正在制作的记录,即使它已经被消费者消费了。有什么办法可以防止这种情况发生吗?我找不到好的信息例如致谢。
有人知道这三个问题的答案吗?
使用者配置:

mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stocks.group.id=test1
mp.messaging.incoming.stocks.auto.offset.reset=earliest

生产者配置:

mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock- 
quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
woobm2wo

woobm2wo1#

Kafka默认保存所有记录,因为它被分类为事件流引擎(实时消息+存储)。主题中消息保留的默认配置为7天(168小时),可以使用主题配置进行更改 retention.ms=10000 (10秒或任何你想要的值)。
对于并发消费,您需要确保主题是分区的,因为主题分区是kafka中的并行单元。我还记得quarkus/smallrye开发团队正在实施kafka重新平衡监听器。如果这一个与最新的quarkus在一起,那么剩下的部分就是划分你的主题。
对于在将分区分配给使用者之前使用消息,我们需要开发团队的输入,我不能建议这样做。

相关问题