Kafka重新启动时没有得到范围内的偏移

2hh7jdfx  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(236)

我们将kafka偏移量存储在db中,用于检查点。这有助于在重新启动应用程序时实现零消息丢失。
在一个场景中,当我们重新启动spark应用程序时,偏移量不在kafka中(由于升级或在docker中运行而被清除)。在这个场景中,spark应用程序抛出错误

java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative

在这种情况下,我们想重新启动应用程序并读取最新偏移量。

try{
 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
    ssc, kafkaParams, fromOffset, msgHandler)
} catch {case ex: Exception => {
KafkaUtils.createDirectStream[String, String,
    StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
 }
}

添加try/catch和从最新偏移量开始在这里没有帮助。因为错误发生在执行器中。有没有办法从spark driver处解决这个问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题