我们在这里的一个新项目中使用了Kafka,取得了很大的成功。在最近的一次切换之前,我们使用kafkatopicoffsetmanager来管理我们的消费者主题偏移。为了不让每个使用者/主题对都有额外的主题并使用burrow或lag监控,我们决定使用最新的kafkanativeoffsetmanager,它使用kafka提供的本机偏移管理。不过,在切换之后,我们注意到目标主题的消息消耗持续滞后。我们知道kafkatopicoffsetmanager并没有发生这种情况,因为在切换之前的几个月我们一直在使用它。我们还进行了并行测试,验证了使用kafkatopicoffsetmanager时,消息的消耗与消息的生成几乎是实时的,而kafkanativeoffsetmanager总是越来越落后。两个偏移管理器都使用默认配置,并且都在处理消息后提交偏移(自动确认)。
所以我真的有两个问题,第一个不是这篇文章的主要问题。
第一个问题是,为什么本机偏移管理比使用主题进行偏移管理慢?
第二个问题是,我们是否可以将si-kafka配置为在成功处理每条消息时不提交偏移量,而是提供不同的策略?我们的想法是,也许我们不应该如此频繁地提交偏移量,而应该将它们作为批处理更新来执行。例如,在成功处理25条消息或30秒后提交偏移量。
谢谢您
2条答案
按热度按时间lbsnaicq1#
当禁用autocommit并接收确认头时,您只需要调用
acknowledge()
一旦你的信息被处理。这假设即使您在不同的线程中处理消息,也会保留对Acknowledgment
示例,或作为原始的一部分Message
-或者,如果您正在进行转换,那么您正在复制头。但是这个电话需要你的密码。第二,性能问题——它是由
KafkaNativeOffsetManager
实现使得对代理的调用变得阻塞,相对更昂贵(相对于简单地向压缩的主题发送消息,如KafkaTopicOffsetManager
做。一般来说,在每条消息之后进行更新是非常昂贵的,在springxd中,我们通过使用https://github.com/spring-projects/spring-xd/blob/master/extensions/spring-xd-extension-kafka/src/main/java/org/springframework/integration/x/kafka/windowingoffsetmanager.java,从而减少有效写入的次数。我想我们可以为spring集成做一些类似的事情。也就是说:相比之下,10万个更新在9.8秒内完成
KafkaNativeOffsetManager
在0.382秒内KafkaTopicOffsetManager
,如所示https://github.com/mbogoevici/spring-integration-kafka/blob/perftest/src/test/java/org/springframework/integration/kafka/performance/offsetmanagerperformancetests.java (结果收集在我自己的机器上)。结果可能会有偏差,但仍然显示出很大的差异。在你的工具箱里追踪可以确认结果。qltillow2#
不知道这是怎么回事
KafkaNativeOffsetManager
,将是伟大的,如果你分享一些关于这个问题的调查,一些瓶颈的地方,在我们的代码在jira。对于延迟补偿提交,我可以建议
autoCommitOffset = false
上KafkaMessageDrivenChannelAdapter
. 把它送到channel
消息将被KafkaHeaders.ACKNOWLEDGMENT
正面标题DefaultAcknowledgment
. 它确实满足了您的要求: