我希望spark在保存到数据库时忽略坏记录

4szc88ey  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(475)

我正在使用sparkjdbc保存数据库中的行。数据的保存工作正常。
问题:如果spark遇到任何错误记录(例如,当表需要非空值时,列具有空值),它将中止保存
我想要的是:我想要spark忽略坏的行并继续保存下一行。如何做到这一点?我在文档中看不到太多。使用 StructType 不是一个选择。
有指针吗?
我的代码是这样的。

class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)

    dataFrameTobeWritten.write.mode(SaveMode.Overwrite)
        .option("driver", dbProperties.driverName)
        .option("truncate", "true")
        .option("batchsize", configurationProp.WriterBatchSize())
        .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}
i7uq4tfw

i7uq4tfw1#

在方法中添加非空列的列表,并使用它们创建筛选条件以筛选出坏行

class DatabaseWriter {

  def writeData(dataFrameTobeWritten: DataFrame, schema: String, targetTableName: String, sparkSession: SparkSession, notNullColumns : List[String]): Unit = {
    val dbProperties = getSQLProperties(sparkSession, configurationProp)
    val filterCondition = notNullColumns.map(c -> s"$c IS NOT NULL").mkString(" AND ")
    dataFrameTobeWritten.filter(filterCondition).write.mode(SaveMode.Overwrite)
    .option("driver", dbProperties.driverName)
    .option("truncate", "true")
    .option("batchsize", configurationProp.WriterBatchSize())
    .jdbc(dbProperties.jdbcUrl, configurationProp.sqlServerSchema(schema) + "." + targetTableName, dbProperties.connectionProp)
  }
}

相关问题