scala SparkSQL:如何处理用户定义函数中的空值?

fslejnso  于 2023-10-18  发布在  Scala
关注(0)|答案(4)|浏览(128)

给定表1,其中一列“x”为String类型。我想创建表2,其中列“y”是“x”中给出的日期字符串的整数表示。

Essential是将null值保留在“y”列中。

表1(数据框df 1):

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)

表2(数据框df 2):

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)

而将列“x”的值转换为列“y”的值的用户定义函数(udf)是:

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )

和作品,处理空值是不可能的。
尽管如此,我还是可以做一些

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else 1 )

我没有找到通过udfs“产生”null值的方法(当然,因为Int s不能是null)。
我目前创建df 2的解决方案(表2)如下:

// holds data of table 1  
val df1 = ... 

// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
  .isNotNull)
  .withColumn("y", extractDateAsInt(df1("x")))
  .withColumnRenamed("x", "right_x")

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

问题

  • 目前的解决方案似乎很麻烦(而且可能效率不高)。性能)。有更好的办法吗?
  • @Spark-developers:是否有一个类型NullableInt计划/可用,这样下面的udf是可能的(见代码摘录)?

代码摘录

val extractDateAsNullableInt = udf[NullableInt, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else null )
wz3gfoph

wz3gfoph1#

这就是Option派上用场的地方:

val extractDateAsOptionInt = udf((d: String) => d match {
  case null => None
  case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
})

或者在一般情况下使其稍微更安全:

import scala.util.Try

val extractDateAsOptionInt = udf((d: String) => Try(
  d.substring(0, 10).filterNot("-".toSet).toInt
).toOption)

所有的功劳都归功于Dmitriy Selivanov,他指出了这个解决方案是一个(缺失的?)编辑here
另一种方法是在UDF之外处理null

import org.apache.spark.sql.functions.{lit, when}
import org.apache.spark.sql.types.IntegerType

val extractDateAsInt = udf(
   (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
)

df.withColumn("y",
  when($"x".isNull, lit(null))
    .otherwise(extractDateAsInt($"x"))
    .cast(IntegerType)
)
8cdiaqws

8cdiaqws2#

Scala实际上有一个很好的工厂函数Option(),它可以使这更简洁:

val extractDateAsOptionInt = udf((d: String) => 
  Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))

在内部,Option对象的apply方法只是为您执行null检查:

def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
kd3sttzy

kd3sttzy3#

补充码

有了@zero323的nice答案,我创建了下面的代码,让用户定义的函数可以像描述的那样处理空值。希望对其他人有帮助!

/**
 * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
 * handle `null` values.
 */
object NullableFunctions {

  import org.apache.spark.sql.functions._
  import scala.reflect.runtime.universe.{TypeTag}
  import org.apache.spark.sql.UserDefinedFunction

  /**
   * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
   *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.
   *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
   * @param f function from A1 => RT
   * @tparam RT return type
   * @tparam A1 input parameter type
   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
   */
  def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
    udf[Option[RT],A1]( (i: A1) => i match {
      case null => None
      case s => Some(f(i))
    })
  }

  /**
   * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
   *   * if on of the function input parameters is null, None is returned.
   *     This will create a null value in the output Spark column.
   *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
   *     as value in the output column.
   * @param f function from A1 => RT
   * @tparam RT return type
   * @tparam A1 input parameter type
   * @tparam A2 input parameter type
   * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
   */
  def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
    udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {
      case (null, _) => None
      case (_, null) => None
      case (s1, s2) => Some((f(s1,s2)))
    } )
  }
}
jmo0nnb3

jmo0nnb34#

使用coalesce函数和col参数来提供一个默认值,如果为null

  • 返回第一个不为null的列,如果所有输入都为null,则返回null。
    • 例如,如果a不为null,coalesce(a, b, c)将返回a,
  • 或者如果a为空且B不为空则为B,或者如果a和B都为空但c不为空则为c。

yourUDF(coalesce(col(parameter1),lit(defaultValueForParameter 1)

相关问题