我们将kafka偏移量存储在db中,用于检查点。这有助于在重新启动应用程序时实现零消息丢失。
在一个场景中,当我们重新启动spark应用程序时,偏移量不在kafka中(由于升级或在docker中运行而被清除)。在这个场景中,spark应用程序抛出错误
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
在这种情况下,我们想重新启动应用程序并读取最新偏移量。
try{
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffset, msgHandler)
} catch {case ex: Exception => {
KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
}
}
添加try/catch和从最新偏移量开始在这里没有帮助。因为错误发生在执行器中。有没有办法从spark driver处解决这个问题?
暂无答案!
目前还没有任何答案,快来回答吧!