我们使用kafka流媒体库为kafka主题的传入消息构建实时通知类系统,因此流媒体应用程序在运行时,会实时处理一个主题中的所有传入消息,并在遇到某种预定义的传入消息时发送通知。
如果流应用程序关闭并再次启动,我们只需要处理流应用程序初始化后收到的最新消息。这是为了避免处理流应用程序未运行或关闭时未处理的旧记录。默认情况下,流应用程序开始处理自上次提交偏移量以来的旧消息。Kafka流媒体应用程序中是否有任何设置只允许处理最新消息?
我们使用kafka流媒体库为kafka主题的传入消息构建实时通知类系统,因此流媒体应用程序在运行时,会实时处理一个主题中的所有传入消息,并在遇到某种预定义的传入消息时发送通知。
如果流应用程序关闭并再次启动,我们只需要处理流应用程序初始化后收到的最新消息。这是为了避免处理流应用程序未运行或关闭时未处理的旧记录。默认情况下,流应用程序开始处理自上次提交偏移量以来的旧消息。Kafka流媒体应用程序中是否有任何设置只允许处理最新消息?
2条答案
按热度按时间djp7away1#
kafkaconsumer的“auto.offset.reset”默认值为“latest”,但要使用kafkastreams,默认值为“earliest”引用:https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/streamsconfig.java#l634
因此,如果set auto.offset.reset是“最新的”,它将是您想要的。
olhwl3o22#
你的假设是正确的。即使你的一套
auto.offset.reset
至latest
,你的应用程序已经有消费者补偿。因此,必须使用
kafka-consumer-groups
使用这些选项的命令--reset-offsets --to-latest --execute
.检查不同的重置方案,你甚至可以重置到一个特定的日期时间,或按期间,从一个文件等。。