在Spark with Scala中注册和使用UDF时,我收到了以下错误消息:
原因:java. lang.类转换异常:无法将java. lang. invoke. SerializedLambda的示例分配给字段。
- 背景:**为了创建UDF的基类以创建累加器并度量UDF运行时
我创建了以下基类:
abstract class UDFWithAccumulator[T1, T2](withAccumulator: Boolean) extends Serializable {
val accumulator: LongAccumulator = if (withAccumulator) SparkSession.getDefaultSession.get.sparkContext.longAccumulator(s"${getClass.getSimpleName}Acc") else null
protected def getFunction(): T1 => T2
private val fn: T1 => T2 = getFunction()
def udf(): T1 => T2 = if (withAccumulator) fnWithAccumulator else fn
private def fnWithAccumulator(input: T1): T2 = {
val startTime = System.currentTimeMillis()
val res: T2 = fn(input)
accumulator.reset()
accumulator.add(System.currentTimeMillis() - startTime)
res
}
}
然后按以下方式使用它:
class CreateRulesDataUdf(withAccumulator: Boolean) extends UDFWithAccumulator[mutable.WrappedArray[Row], RulesData](withAccumulator) {
val DEFAULT_RULES_ARR: Array[Rule] = Array()
override protected def getFunction(): mutable.WrappedArray[Row] => RulesData = {}
其中,RulesData定义为:
case class RulesData(rules: mutable.WrappedArray[Rule], botnet_id: String) extends Serializable
UDF注册通过以下方式完成:
session.udf.register(CREATE_RULES_DATA_UDF, new CreateRulesDataUdf(withAccumulator).udf())
调用UDF时,我收到以下错误消息:
原因:java. lang.类转换异常:无法将java. lang. invoke. SerializedLambda的示例分配给字段。
当将基类实现更改为以下代码时,我能够解决这个问题(唯一的更改是我没有使用字段来存储functionRef):
abstract class UDFWithAccumulator[T1, T2](withAccumulator: Boolean) extends Serializable {
val accumulator: LongAccumulator = if (withAccumulator) SparkSession.getDefaultSession.get.sparkContext.longAccumulator(s"${getClass.getSimpleName}Acc") else null
protected def getFunction(): T1 => T2
def udf(): T1 => T2 = if (withAccumulator) fnWithAccumulator else getFunction()
private def fnWithAccumulator(input: T1): T2 = {
val startTime = System.currentTimeMillis()
val res: T2 = fn(input)
accumulator.reset()
accumulator.add(System.currentTimeMillis() - startTime)
res
}
}
- 问题**我试图理解为什么这个变通方法可以解决这个问题,为什么spark无法序列化/反序列化这个函数?
1条答案
按热度按时间qojgxg4l1#
所以,我尝试聊天GPT和他给了我以下的答案:
ClassCastException错误的原因是Spark无法序列化存储在UDFWithAccumulator类的fn字段中的lambda函数对象,这是因为lambda函数是作为java. lang. invoke. SerializedLambda类的示例实现的,而该类在默认情况下是不可序列化的。
当Spark试图序列化UDFWithAccumulator对象时,它包括
序列化过程中的fn字段。由于SerializedLambda不可序列化,因此序列化失败,导致ClassCastException。
在UDFWithAccumulator类的第二个实现中,fn字段被删除,getFunction()方法直接在udf()方法内部调用。这意味着lambda函数不存储为字段,因此不包括在序列化过程中。这消除了序列化错误,并允许成功注册和使用UDF。
为了避免将来出现这个问题,您可以修改您的实现以避免在字段中存储lambda函数,而是在需要函数时直接调用getFunction()方法。此外,您可能需要考虑使用不同的方法来测量UDF运行时,并创建不依赖于在字段中存储lambda函数的累加器。
然后我看到SerializedLambda确实实现了Serializable,所以我再次询问聊天:java.lang.invoke.SerializedLambda已实现为可序列化
他回答说:
java.lang.invoke.SerializedLambda是Serializable接口的实现,这意味着此类的示例可以序列化和反序列化,这一点是正确的。
但是,序列化lambda函数会出现问题,因为SerializedLambda类在默认情况下不被序列化。创建lambda函数时,Java编译器会生成SerializedLambda类的一个示例,该示例表示lambda函数,但在默认情况下该示例不可序列化。
为了序列化一个lambda函数,SerializedLambda示例需要用合适的值初始化,比如lambda函数使用的方法句柄和方法类型。这个初始化过程不是由Java运行时或Spark自动执行的,必须由程序员手动完成。
在Spark UDF的上下文中,这意味着UDF中使用的任何lambda函数在注册为UDF之前都必须正确地初始化和序列化,这可以通过手动序列化lambda函数并存储结果字节来实现,或者使用为lambda函数提供序列化支持的第三方库来实现。
因此,虽然SerializedLambda在技术上是可序列化的,但序列化lambda函数的问题是由于SerializedLambda示例在默认情况下不会自动初始化和序列化。