Spring Kafka

kx1ctssn  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(340)

我将spring和springkafka用于一个批处理服务,该服务从kafka收集数据,直到满足某些条件,然后转储数据。
我想在数据离开我的服务时确认提交,但它可能会在内存中保留5-10分钟。
考虑到SpringKafka中的确认实现保留了原始记录,在我转储数据之前保留它们似乎是不合理的,因为这样会影响内存利用率。
在给定分区/偏移信息的情况下,有没有其他方法来确认/提交spring-kafka的偏移?

bmp9r5qi

bmp9r5qi1#

你可以用 AckMode.TIME 或者 AckMode.COUNT 有一个非常大的 ackTime 或者 ackCount 所以容器永远不会进行确认。
然后,通过 Consumer<?, ?> 到侦听器方法中,然后自己执行偏移提交。
但是请注意,使用者不是线程安全的,因此必须在侦听器线程上执行提交。
另外,请记住,记录不是单独确认的,只是偏移量。你不能确认“坏了”。
另外,您可能需要增加 max.poll.interval.ms 超出默认值(5分钟),以避免重新平衡分区。

相关问题