当streamlistener花费很长时间(超过 max.poll.interval.ms
)处理一条消息,因此该特定使用者被占用,其他新消息将被分配给其他分区。时间大于 max.poll.interval.ms
,重新平衡发生,同样的情况也会发生在另一个消费者身上。因此,此消息将在所有分区中循环,并继续占用资源。
然而,这种情况并不经常发生,只有一些消息不知何故要花很长时间来处理,这是无法控制的。
我们是否可以提交偏移量并在重新平衡几次后将其扔给dlq?如果是,我们怎么做?如果没有,这种情况应该怎么处理?
1条答案
按热度按时间ztyzrc3y1#
增加的
max.poll.interval.ms
不会对性能产生影响(除非检测到真正死亡的消费者需要更长的时间)。每次处理这个“坏”记录时都要进行重新平衡,这对性能的损害要大得多。
但是,您可以通过自定义
SeekToCurrentErrorHandler
以及一个回收器,如DeadLetterPublishingRecoverer
. 您还需要一个重新平衡侦听器来计算重新平衡,以及一些机制来跨示例共享来自错误处理程序的状态(标准侦听器只将状态保存在内存中)。我觉得挺复杂的。