SpringCloudStream:在处理消息需要很长时间时对streamlistener的正确处理

5rgfhyps  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(461)

当streamlistener花费很长时间(超过 max.poll.interval.ms )处理一条消息,因此该特定使用者被占用,其他新消息将被分配给其他分区。时间大于 max.poll.interval.ms ,重新平衡发生,同样的情况也会发生在另一个消费者身上。因此,此消息将在所有分区中循环,并继续占用资源。
然而,这种情况并不经常发生,只有一些消息不知何故要花很长时间来处理,这是无法控制的。
我们是否可以提交偏移量并在重新平衡几次后将其扔给dlq?如果是,我们怎么做?如果没有,这种情况应该怎么处理?

ztyzrc3y

ztyzrc3y1#

增加的 max.poll.interval.ms 不会对性能产生影响(除非检测到真正死亡的消费者需要更长的时间)。
每次处理这个“坏”记录时都要进行重新平衡,这对性能的损害要大得多。
但是,您可以通过自定义 SeekToCurrentErrorHandler 以及一个回收器,如 DeadLetterPublishingRecoverer . 您还需要一个重新平衡侦听器来计算重新平衡,以及一些机制来跨示例共享来自错误处理程序的状态(标准侦听器只将状态保存在内存中)。
我觉得挺复杂的。

相关问题