关闭。这个问题需要更加突出重点。它目前不接受答案。
**想改进这个问题吗?**通过编辑这篇文章更新这个问题,使它只关注一个问题。
六个月前关门了。
改进这个问题
我有两个微服务,一个生产者和一个消费者。制作人每两秒钟向Kafka主题写入一个递增的数字。使用者有两个示例正在运行,使用这些增量。我注意到一些我想解决的奇怪的事情:
当还没有消费者,生产者开始生产的消息被保存在Kafka。当消费者联机时,它不会处理生产者已经生成的现有消息,而是开始消费现在传入的消息。消费者如何处理所有未消费的消息?
当有两个消费者时,我希望两个消费者都能平等消费。现在只有一个消费者得到所有的负荷,而另一个只是坐在那里。如何将负载分散到消费者的数量上?
看起来Kafka保存了所有正在制作的记录,即使它已经被消费者消费了。有什么办法可以防止这种情况发生吗?我找不到好的信息例如致谢。
有人知道这三个问题的答案吗?
使用者配置:
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stocks.group.id=test1
mp.messaging.incoming.stocks.auto.offset.reset=earliest
生产者配置:
mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock-
quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
1条答案
按热度按时间woobm2wo1#
Kafka默认保存所有记录,因为它被分类为事件流引擎(实时消息+存储)。主题中消息保留的默认配置为7天(168小时),可以使用主题配置进行更改
retention.ms=10000
(10秒或任何你想要的值)。对于并发消费,您需要确保主题是分区的,因为主题分区是kafka中的并行单元。我还记得quarkus/smallrye开发团队正在实施kafka重新平衡监听器。如果这一个与最新的quarkus在一起,那么剩下的部分就是划分你的主题。
对于在将分区分配给使用者之前使用消息,我们需要开发团队的输入,我不能建议这样做。