重新启动时忽略旧消息

k2fxgqgv  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(360)

我处理实时应用程序的timeseries数据。所以旧数据没有意义。我只想处理流应用程序启动后收到的数据,而不是以前提交的偏移量。重启后忽略Kafka流应用上旧记录的正确方法是什么?
对于kafka消费api,我通常使用 seekToEnd() 方法向前跳到最新记录。对于流是否有等效的机制?我希望避免筛选自上次提交以来的所有消息以忽略旧消息。

yzuktlbb

yzuktlbb1#

您可以使用kafka consumer api和 groupIdapplicationId 为Kafka流和使用该消费者做一个 seekToEnd() 开始流之前。禁用此特殊使用者的自动提交功能,然后手动提交偏移量 seekToEnd() . 然后尝试启动你的流。
确保在提交来自reset consumer的偏移量之前流没有启动。

相关问题