Spark UDF序列化失败

iyfjxgzm  于 2023-02-24  发布在  Apache
关注(0)|答案(1)|浏览(261)

在Spark with Scala中注册和使用UDF时,我收到了以下错误消息:
原因:java. lang.类转换异常:无法将java. lang. invoke. SerializedLambda的示例分配给字段。

    • 背景:**为了创建UDF的基类以创建累加器并度量UDF运行时

我创建了以下基类:

abstract class UDFWithAccumulator[T1, T2](withAccumulator: Boolean) extends Serializable {

    val accumulator: LongAccumulator = if (withAccumulator) SparkSession.getDefaultSession.get.sparkContext.longAccumulator(s"${getClass.getSimpleName}Acc") else null

    protected def getFunction(): T1 => T2

    private val fn: T1 => T2 = getFunction()

  def udf(): T1 => T2 = if (withAccumulator) fnWithAccumulator else fn
  private def fnWithAccumulator(input: T1): T2 = {
    val startTime = System.currentTimeMillis()
    val res: T2 = fn(input)
    accumulator.reset()
    accumulator.add(System.currentTimeMillis() - startTime)
    res
  }

}

然后按以下方式使用它:

class CreateRulesDataUdf(withAccumulator: Boolean) extends UDFWithAccumulator[mutable.WrappedArray[Row], RulesData](withAccumulator) {
    val DEFAULT_RULES_ARR: Array[Rule] = Array()

    override protected def getFunction(): mutable.WrappedArray[Row] => RulesData = {}

其中,RulesData定义为:

case class RulesData(rules: mutable.WrappedArray[Rule], botnet_id: String) extends Serializable

UDF注册通过以下方式完成:

session.udf.register(CREATE_RULES_DATA_UDF, new CreateRulesDataUdf(withAccumulator).udf())

调用UDF时,我收到以下错误消息:
原因:java. lang.类转换异常:无法将java. lang. invoke. SerializedLambda的示例分配给字段。
当将基类实现更改为以下代码时,我能够解决这个问题(唯一的更改是我没有使用字段来存储functionRef):

abstract class UDFWithAccumulator[T1, T2](withAccumulator: Boolean) extends Serializable {

  val accumulator: LongAccumulator = if (withAccumulator) SparkSession.getDefaultSession.get.sparkContext.longAccumulator(s"${getClass.getSimpleName}Acc") else null

  protected def getFunction(): T1 => T2
  def udf(): T1 => T2 = if (withAccumulator) fnWithAccumulator else getFunction()
  private def fnWithAccumulator(input: T1): T2 = {
    val startTime = System.currentTimeMillis()
    val res: T2 = fn(input)
    accumulator.reset()
    accumulator.add(System.currentTimeMillis() - startTime)
    res
  }

}
    • 问题**我试图理解为什么这个变通方法可以解决这个问题,为什么spark无法序列化/反序列化这个函数?
qojgxg4l

qojgxg4l1#

所以,我尝试聊天GPT和他给了我以下的答案:
ClassCastException错误的原因是Spark无法序列化存储在UDFWithAccumulator类的fn字段中的lambda函数对象,这是因为lambda函数是作为java. lang. invoke. SerializedLambda类的示例实现的,而该类在默认情况下是不可序列化的。
当Spark试图序列化UDFWithAccumulator对象时,它包括
序列化过程中的fn字段。由于SerializedLambda不可序列化,因此序列化失败,导致ClassCastException。
在UDFWithAccumulator类的第二个实现中,fn字段被删除,getFunction()方法直接在udf()方法内部调用。这意味着lambda函数不存储为字段,因此不包括在序列化过程中。这消除了序列化错误,并允许成功注册和使用UDF。
为了避免将来出现这个问题,您可以修改您的实现以避免在字段中存储lambda函数,而是在需要函数时直接调用getFunction()方法。此外,您可能需要考虑使用不同的方法来测量UDF运行时,并创建不依赖于在字段中存储lambda函数的累加器。
然后我看到SerializedLambda确实实现了Serializable,所以我再次询问聊天:java.lang.invoke.SerializedLambda已实现为可序列化
他回答说:
java.lang.invoke.SerializedLambda是Serializable接口的实现,这意味着此类的示例可以序列化和反序列化,这一点是正确的。
但是,序列化lambda函数会出现问题,因为SerializedLambda类在默认情况下不被序列化。创建lambda函数时,Java编译器会生成SerializedLambda类的一个示例,该示例表示lambda函数,但在默认情况下该示例不可序列化。
为了序列化一个lambda函数,SerializedLambda示例需要用合适的值初始化,比如lambda函数使用的方法句柄和方法类型。这个初始化过程不是由Java运行时或Spark自动执行的,必须由程序员手动完成。
在Spark UDF的上下文中,这意味着UDF中使用的任何lambda函数在注册为UDF之前都必须正确地初始化和序列化,这可以通过手动序列化lambda函数并存储结果字节来实现,或者使用为lambda函数提供序列化支持的第三方库来实现。
因此,虽然SerializedLambda在技术上是可序列化的,但序列化lambda函数的问题是由于SerializedLambda示例在默认情况下不会自动初始化和序列化。

相关问题