我正在写一个kafka消费者,为了学习,这次我想到了使用spring实现kafka。到目前为止,我一直在使用javaapi来编写消费者。
我想手动管理偏移量,所以我在SpringKafka包中搜索类似于ConsumerBalanceListener的东西。令我成功的是,我在Spring遇到了ConsumerAreaRebalanceListener,它可以代替ConsumerBalanceListener。
但是当我查看consumerawarerebalancelistener接口时,可以看到两个方法:onpartitionsrevokedbeforecommit和onpartitionsrevokedaftercommit,这在kafkajavaapi中是不可用的。
有人能解释一下我在哪里可以用这种方法吗?
p、 s-看过SpringKafka的实现,但不太明白它在哪里有用。
1条答案
按热度按时间lbsnaicq1#
SpringKafka有一个信息驱动的消费模式;您提供了一个pojo消息监听器,框架执行轮询并将消息传递给监听器,一次传递一条消息,或者一批传递一条消息。
它有各种提交偏移量的模式(它更喜欢关闭)
enable.auto.commmit
在客户端)。手动确认有两种模式
AckMode.MANUAL
以及AckMode.MANUAL_IMMEDIATE
; 在这些模式下,我们传递一个Acknowledgment
对象,然后调用ack.acknowledge()
.当模式为
MANUAL_IMMEDIATE
,只要你呼叫acknowledge()
在使用者线程上,直接调用使用者。当模式为
MANUAL
,将偏移量添加到内部队列中,并在处理轮询结果结束时完成提交。类似地,有几种“自动”确认模式;主要是
RECORD
以及BATCH
当侦听器正常退出时,容器提交偏移量。在记录模式下,提交是在处理每个记录之后发送的;在批处理模式下,提交是在处理所有轮询结果之后完成的。批量提交补偿更有效,但会增加重复交付的风险。
当重新平衡发生时,我们也会提交任何挂起的补偿。
那么,为什么是这两个
onPartitionsRevoked*
方法?使用手动、批处理或其他方法时
AckMode
可能有待定偏移量要提交的,onPartitionsRevokedBeforeCommit()
在提交这些挂起的偏移量之前调用onPartitionsRevokedAfterCommit()
在提交这些偏移量之后调用。所以,
consumer.position()
在每个方法中可能返回不同的结果。大多数人会对
onPartitionsRevokedAfterCommit()
但我们觉得我们应该提供两种选择。如果你使用
AckMode.MANUAL_IMMEDIATE
或者AckMode.RECORD
,应该没有区别,因为不会有挂起的ack。但是,由于侦听器是在使用线程上调用的,因此在轮询期间,只有在使用基于时间或基于计数的方法时才会有真正的区别
AckMode
s。对于其他AckMode,我们已经提交了偏移量。希望这是清楚的。