spark:访问自定义项中的行

0sgqnhkj  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(442)

我有以下自定义项用于将存储为字符串的时间转换为时间戳。

val hmsToTimeStampUdf = udf((dt: String) => {
    if (dt == null) null else {
      val formatter = DateTimeFormat.forPattern("HH:mm:ss")
      try {
        new Timestamp(formatter.parseDateTime(dt).getMillis)
      } catch {
        case t: Throwable => throw  new RuntimeException("hmsToTimeStampUdf,dt="+dt, t)
      }
    }
  })

此自定义项用于转换 String 价值观 Timestamp :

outputDf.withColumn(schemaColumn.name, ymdToTimeStampUdf(col(schemaColumn.name))

但某些csv文件对此列的值无效 RuntimeException . 我想找出哪些行有这些坏记录。是否可以访问自定义项中的行信息?

6tdlim6h

6tdlim6h1#

而不是扔 RuntimeException 这会扼杀你的.csv解析,也许更好的方法是让udf返回一个元组(格式良好,损坏的)值。然后,您可以通过选择 is null / is not null 子集。

def safeConvert(dt: String) : (Timestamp,String) = {
  if (dt == null) 
    (null,null) 
  else {
    val formatter = DateTimeFormat.forPattern("HH:mm:ss")
    try {
      (new Timestamp(formatter.parseDateTime(dt).getMillis),null)
    } catch {
      case e:Exception => 
        (null,dt)
    }
  }
}
val safeConvertUDF = udf(safeConvert(_:String))

val df = Seq(("00:01:02"),("03:04:05"),("67:89:10")).toDF("dt")

df.withColumn("temp",safeConvertUDF($"dt"))
  .withColumn("goodData",$"temp".getItem("_1"))
  .withColumn("badData",$"temp".getItem("_2"))
  .drop($"temp").show(false)
+--------+-------------------+--------+
|dt      |goodData           |badData |
+--------+-------------------+--------+
|00:01:02|1970-01-01 00:01:02|null    |
|03:04:05|1970-01-01 03:04:05|null    |
|67:89:10|null               |67:89:10|
+--------+-------------------+--------+
nukf8bse

nukf8bse2#

您可以将该行作为第二个输入参数添加到自定义项中:

val hmsToTimeStampUdf = udf((dt: String, r: Row) => {
  if (dt == null) null else {
    val formatter = DateTimeFormat.forPattern("HH:mm:ss")
    try {
      new Timestamp(formatter.parseDateTime(dt).getMillis)
    } catch {
      case t: Throwable => {
        println(r) //do some error handling
        null
      }
    }
  }
})

在调用udf时,使用一个结构,将dataframe的所有列作为第二个参数(由于这个答案):

df.withColumn("dt", hmsToTimeStampUdf(col("dt"), struct(df.columns.map(df(_)) : _*)))

相关问题