onpartitionsrevokedbeforecommit与onpartitionsrevokedbeforcommit

mklgxw1f  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(449)

我正在写一个kafka消费者,为了学习,这次我想到了使用spring实现kafka。到目前为止,我一直在使用javaapi来编写消费者。
我想手动管理偏移量,所以我在SpringKafka包中搜索类似于ConsumerBalanceListener的东西。令我成功的是,我在Spring遇到了ConsumerAreaRebalanceListener,它可以代替ConsumerBalanceListener。
但是当我查看consumerawarerebalancelistener接口时,可以看到两个方法:onpartitionsrevokedbeforecommit和onpartitionsrevokedaftercommit,这在kafkajavaapi中是不可用的。
有人能解释一下我在哪里可以用这种方法吗?
p、 s-看过SpringKafka的实现,但不太明白它在哪里有用。

lbsnaicq

lbsnaicq1#

SpringKafka有一个信息驱动的消费模式;您提供了一个pojo消息监听器,框架执行轮询并将消息传递给监听器,一次传递一条消息,或者一批传递一条消息。
它有各种提交偏移量的模式(它更喜欢关闭) enable.auto.commmit 在客户端)。
手动确认有两种模式 AckMode.MANUAL 以及 AckMode.MANUAL_IMMEDIATE ; 在这些模式下,我们传递一个 Acknowledgment 对象,然后调用 ack.acknowledge() .
当模式为 MANUAL_IMMEDIATE ,只要你呼叫 acknowledge() 在使用者线程上,直接调用使用者。
当模式为 MANUAL ,将偏移量添加到内部队列中,并在处理轮询结果结束时完成提交。
类似地,有几种“自动”确认模式;主要是 RECORD 以及 BATCH 当侦听器正常退出时,容器提交偏移量。在记录模式下,提交是在处理每个记录之后发送的;在批处理模式下,提交是在处理所有轮询结果之后完成的。
批量提交补偿更有效,但会增加重复交付的风险。
当重新平衡发生时,我们也会提交任何挂起的补偿。
那么,为什么是这两个 onPartitionsRevoked* 方法?
使用手动、批处理或其他方法时 AckMode 可能有待定偏移量要提交的, onPartitionsRevokedBeforeCommit() 在提交这些挂起的偏移量之前调用 onPartitionsRevokedAfterCommit() 在提交这些偏移量之后调用。
所以, consumer.position() 在每个方法中可能返回不同的结果。
大多数人会对 onPartitionsRevokedAfterCommit() 但我们觉得我们应该提供两种选择。
如果你使用 AckMode.MANUAL_IMMEDIATE 或者 AckMode.RECORD ,应该没有区别,因为不会有挂起的ack。
但是,由于侦听器是在使用线程上调用的,因此在轮询期间,只有在使用基于时间或基于计数的方法时才会有真正的区别 AckMode s。对于其他AckMode,我们已经提交了偏移量。
希望这是清楚的。

相关问题