处理流时丢失kafka偏移量

doinxwow  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(250)

我正在使用kafkautils.createdirectstream kafka api使用消息,然后处理要存储在配置单元中的消息。由于访问配置单元元存储失败,有时作业会失败。重新启动时,我看到它总是从“最新”消息开始读取(因为在我的代码auto.offset.reset=latest中),而不是从最后提交的消息读取。在这种情况下,它正在失去补偿。因此,我希望在整个处理完成后存储偏移量,以便在检索消息失败时检索它。
请建议如何使用kafkautils.createdirectstream将偏移量存储在kafka中,以确保完成整个kafka作业。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题