spark在出现故障时中止流,并在出现故障时需要帮助恢复

wbrvyc0a  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(204)

我有一个进程正在运行,它利用followsudo代码从kafka读取内容,然后发布到elasticsearch。

try {
  val outStreamES = spark.readStream
    .format("kafka")
    .option("subscribe", topics.keys.mkString(","))
    .options(kafkaConfig)
    .load()
    .select($"key".cast(StringType), $"value".cast(StringType), $"topic")
    // Convert untyped dataframe to dataset
    .as[(String, String, String)]
    // Merge all manifests for vehicle in minibatch
    .groupByKey(_._1)
    //Start of merge
    .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.ProcessingTimeTimeout)(mergeGroup)

    // .select($"key".cast(StringType),from_json($"value",schema).as("manifest"))
    .select($"_1".alias("key"), $"_2".alias("manifest"))
    val inStreamManifestMain = outStreamES
     inStreamManifestMain
    .select("key", "manifest.*")
    // Convert timestamp columns to strings - avoids conversion to longs otherwise
    .writeStream
    .outputMode("append")
    .format("org.elasticsearch.spark.sql")
    .trigger(Trigger.ProcessingTime(conf.getString("spark.trigger")))
    .option("mode", "DROPMALFORMED")
    .options(configToMap(conf.getObject("esConf")))
    .start()

在mergegroup中,我有一个try/catch来查找任何与模式不匹配的坏记录。有没有办法拒绝与模式不匹配的坏记录,而不是杀死整个spark流?
我正在使用的try/catch的sudo代码,哪一条记录导致流连续失败,清除记录的唯一方法是清除整个主题

val manifests = rows.map(r => (
  try {
    read[ProductManifestDocument](r._2)
  } catch {
    case ex: MappingException => throw MappingException(ex.msg + "\n" + r._2  + "vRECORD FAILED TO MAPv ", ex)
  },
  //all topics
  topics(r._3)
))
  .toList

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题