我处理实时应用程序的timeseries数据。所以旧数据没有意义。我只想处理流应用程序启动后收到的数据,而不是以前提交的偏移量。重启后忽略Kafka流应用上旧记录的正确方法是什么?对于kafka消费api,我通常使用 seekToEnd() 方法向前跳到最新记录。对于流是否有等效的机制?我希望避免筛选自上次提交以来的所有消息以忽略旧消息。
seekToEnd()
yzuktlbb1#
您可以使用kafka consumer api和 groupId 同 applicationId 为Kafka流和使用该消费者做一个 seekToEnd() 开始流之前。禁用此特殊使用者的自动提交功能,然后手动提交偏移量 seekToEnd() . 然后尝试启动你的流。确保在提交来自reset consumer的偏移量之前流没有启动。
groupId
applicationId
1条答案
按热度按时间yzuktlbb1#
您可以使用kafka consumer api和
groupId
同applicationId
为Kafka流和使用该消费者做一个seekToEnd()
开始流之前。禁用此特殊使用者的自动提交功能,然后手动提交偏移量seekToEnd()
. 然后尝试启动你的流。确保在提交来自reset consumer的偏移量之前流没有启动。