我将spring和springkafka用于一个批处理服务,该服务从kafka收集数据,直到满足某些条件,然后转储数据。我想在数据离开我的服务时确认提交,但它可能会在内存中保留5-10分钟。考虑到SpringKafka中的确认实现保留了原始记录,在我转储数据之前保留它们似乎是不合理的,因为这样会影响内存利用率。在给定分区/偏移信息的情况下,有没有其他方法来确认/提交spring-kafka的偏移?
bmp9r5qi1#
你可以用 AckMode.TIME 或者 AckMode.COUNT 有一个非常大的 ackTime 或者 ackCount 所以容器永远不会进行确认。然后,通过 Consumer<?, ?> 到侦听器方法中,然后自己执行偏移提交。但是请注意,使用者不是线程安全的,因此必须在侦听器线程上执行提交。另外,请记住,记录不是单独确认的,只是偏移量。你不能确认“坏了”。另外,您可能需要增加 max.poll.interval.ms 超出默认值(5分钟),以避免重新平衡分区。
AckMode.TIME
AckMode.COUNT
ackTime
ackCount
Consumer<?, ?>
max.poll.interval.ms
1条答案
按热度按时间bmp9r5qi1#
你可以用
AckMode.TIME
或者AckMode.COUNT
有一个非常大的ackTime
或者ackCount
所以容器永远不会进行确认。然后,通过
Consumer<?, ?>
到侦听器方法中,然后自己执行偏移提交。但是请注意,使用者不是线程安全的,因此必须在侦听器线程上执行提交。
另外,请记住,记录不是单独确认的,只是偏移量。你不能确认“坏了”。
另外,您可能需要增加
max.poll.interval.ms
超出默认值(5分钟),以避免重新平衡分区。