使用kafkanativeoffsetmanager时,spring集成kafka消息处理速度慢

oxiaedzo  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(385)

我们在这里的一个新项目中使用了Kafka,取得了很大的成功。在最近的一次切换之前,我们使用kafkatopicoffsetmanager来管理我们的消费者主题偏移。为了不让每个使用者/主题对都有额外的主题并使用burrow或lag监控,我们决定使用最新的kafkanativeoffsetmanager,它使用kafka提供的本机偏移管理。不过,在切换之后,我们注意到目标主题的消息消耗持续滞后。我们知道kafkatopicoffsetmanager并没有发生这种情况,因为在切换之前的几个月我们一直在使用它。我们还进行了并行测试,验证了使用kafkatopicoffsetmanager时,消息的消耗与消息的生成几乎是实时的,而kafkanativeoffsetmanager总是越来越落后。两个偏移管理器都使用默认配置,并且都在处理消息后提交偏移(自动确认)。
所以我真的有两个问题,第一个不是这篇文章的主要问题。
第一个问题是,为什么本机偏移管理比使用主题进行偏移管理慢?
第二个问题是,我们是否可以将si-kafka配置为在成功处理每条消息时不提交偏移量,而是提供不同的策略?我们的想法是,也许我们不应该如此频繁地提交偏移量,而应该将它们作为批处理更新来执行。例如,在成功处理25条消息或30秒后提交偏移量。
谢谢您

lbsnaicq

lbsnaicq1#

当禁用autocommit并接收确认头时,您只需要调用 acknowledge() 一旦你的信息被处理。这假设即使您在不同的线程中处理消息,也会保留对 Acknowledgment 示例,或作为原始的一部分 Message -或者,如果您正在进行转换,那么您正在复制头。但是这个电话需要你的密码。
第二,性能问题——它是由 KafkaNativeOffsetManager 实现使得对代理的调用变得阻塞,相对更昂贵(相对于简单地向压缩的主题发送消息,如 KafkaTopicOffsetManager 做。一般来说,在每条消息之后进行更新是非常昂贵的,在springxd中,我们通过使用https://github.com/spring-projects/spring-xd/blob/master/extensions/spring-xd-extension-kafka/src/main/java/org/springframework/integration/x/kafka/windowingoffsetmanager.java,从而减少有效写入的次数。我想我们可以为spring集成做一些类似的事情。
也就是说:相比之下,10万个更新在9.8秒内完成 KafkaNativeOffsetManager 在0.382秒内 KafkaTopicOffsetManager ,如所示https://github.com/mbogoevici/spring-integration-kafka/blob/perftest/src/test/java/org/springframework/integration/kafka/performance/offsetmanagerperformancetests.java (结果收集在我自己的机器上)。结果可能会有偏差,但仍然显示出很大的差异。在你的工具箱里追踪可以确认结果。

qltillow

qltillow2#

不知道这是怎么回事 KafkaNativeOffsetManager ,将是伟大的,如果你分享一些关于这个问题的调查,一些瓶颈的地方,在我们的代码在jira。
对于延迟补偿提交,我可以建议 autoCommitOffset = falseKafkaMessageDrivenChannelAdapter . 把它送到 channel 消息将被 KafkaHeaders.ACKNOWLEDGMENT 正面标题 DefaultAcknowledgment . 它确实满足了您的要求:

/**
 * Invoked when the message for which the acknowledgment has been created has been processed.
 * Calling this method implies that all the previous messages in the partition have been processed already.
 */
void acknowledge();

相关问题