我们是否应该使用max.poll.records或max.poll.interval.ms来处理kafka consumer中处理时间较长的记录?

kmynzznz  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(2436)

我想知道在kafka consumer中处理耗时较长的记录的更好选择是什么?我运行了一些测试来理解这一点,并观察到我们可以通过修改 max.poll.records 或者 max.poll.interval.ms .
现在我的问题是,什么是更好的选择?请建议。

eiee3dmh

eiee3dmh1#

max.poll.records 简单地定义在单个调用中返回的最大记录数 poll() .
现在 max.poll.interval.ms 定义对的调用之间的延迟 poll() . max.poll.interval.ms: 调用的最大延迟 poll() 使用消费群管理时。这为消费者在获取更多记录之前可以空闲的时间量设置了一个上限。如果 poll() 在此超时过期之前未调用,则认为使用者失败,组将重新平衡,以便将分区重新分配给另一个成员。对于使用非空 group.instance.id 如果达到此超时,则不会立即重新分配分区。相反,使用者将停止发送心跳,分区将在过期后重新分配 session.timeout.ms . 这反映了已关闭的静态使用者的行为。
我相信你可以调整两者,以达到预期的行为。例如,您可以计算消息的平均处理时间。如果平均处理时间是1秒 max.poll.records=100 然后您应该为轮询间隔留出大约100多秒的时间。

anauzrmj

anauzrmj2#

如果您的处理速度很慢,因此希望避免重新平衡,那么调整两者都可以实现这一点。但是,扩展max.poll.interval.ms以允许轮询之间有更长的间隔确实有点副作用。
每个使用者只使用2个线程-轮询线程和心跳线程。
后者让组知道您的应用程序仍处于活动状态,因此可以在max.poll.interval.ms过期之前触发重新平衡,它还可以在您处理以前轮询的批处理时预取记录。
轮询线程执行组通信方面的所有其他操作,因此在轮询方法期间,您将发现是否在其他位置触发了重新平衡,是否有分区引线已死亡,因此需要刷新元数据。这意味着,如果允许两次轮询之间的间隔更长,则整个组对更改的响应速度较慢(例如,在重新平衡之后,直到所有用户都收到新分区之后,才开始接收消息—如果重新平衡发生在一个用户刚开始处理批处理10分钟之后,则所有用户都会收到消息)消费者将在这里逗留至少那么长时间)。
因此,对于响应速度更快的组,在消息处理速度可能较慢的情况下,您应该选择减少在每个批中获取的记录。

相关问题