无法重新启动kafka使用者应用程序,由于OffsetAutoFrangeException而失败

3htmauhk  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(660)

目前,我的kafka消费流应用程序正在手动将偏移量提交到kafka中 enable.auto.commit 设置为 false . 我尝试重新启动应用程序时失败,引发以下异常:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

假设上述错误是由于消息不存在/由于保留期而删除分区造成的,我尝试了以下方法:
我禁用了手动提交并启用了自动提交( enable.auto.commit=true 以及 auto.offset.reset=earliest )但它还是失败了,出现了同样的错误

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555}

请建议重新启动作业的方法,以便它可以成功读取存在消息/分区的正确偏移量

ttygqcqt

ttygqcqt1#

我遇到了同样的问题,在我的应用程序中使用了org.apache.spark.streaming.kafka010包,一开始我接受auto.offset.reset策略没有效果,但是当我读到对象kafkautils中方法fixkafkaparams的描述时,我发现配置已被覆盖。我想它调整executor的配置consumerconfig.auto\u offset\u reset\u config的原因是为了保持驱动程序和executor获得的偏移量一致。

332nm8kg

332nm8kg2#

您正在尝试读取偏移量 155555555 从分区 12 主题 partition ,但最有可能的是,由于您的保留策略,它可能已被删除。
您可以使用kafka streams应用程序重置工具来重置kafka streams应用程序的内部状态,以便它可以从头开始重新处理输入数据

$ bin/kafka-streams-application-reset.sh

Option (* = required)         Description
---------------------         -----------

* --application-id <id>       The Kafka Streams application ID (application.id)

--bootstrap-servers <urls>    Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2
                                (default: localhost:9092)
--intermediate-topics <list>  Comma-separated list of intermediate user topics
--input-topics <list>         Comma-separated list of user input topics
--zookeeper <url>             Format: HOST:POST
                                (default: localhost:2181)

或者使用新的消费者组id启动消费者。

相关问题