Spark udf错误任务不可序列化(scala)

kokeuurv  于 2023-02-16  发布在  Scala
关注(0)|答案(1)|浏览(224)

我在一个scalaSpark项目工作。我想检索一个代码的措辞与2 Dataframe 。
我有2个 Dataframe :

  • 一个列有代码,我将检索措辞。(列兴趣)

dataframe

  • 第二个有2列代码和该代码的措辞。

dataframe
我这样做:

def CodeToInterest(df: sql.DataFrame, codesList: sql.DataFrame) : sql.DataFrame = {
val spark =  SparkSession.builder().getOrCreate()
import spark.implicits._

val transformList = udf((init: Array[String]) => {
  if(init == null) return null
  else init.map((code: String) => {
    if(!code.startsWith("IAB")) code
    else codesList.filter($"Code" === code)
        .first()
        .getAs[String]("Interest")
  })
}).apply(col("interests"))

df.withColumn("newInterests", transformList)

}

但我有个错误。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
    at SassWI.RetrieveData$.main(RetrieveData.scala:46)
    at SassWI.RetrieveData.main(RetrieveData.scala) 
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
    - object not serializable (class: java.lang.Object, value: java.lang.Object@3aeb267)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class SassWI.Etl$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic SassWI/Etl$.$anonfun$CodeToInterest$1:(Ljava/lang/Object;Lorg/apache/spark/sql/Dataset;Lorg/apache/spark/sql/SparkSession;[Ljava/lang/String;)[Ljava/lang/String;, instantiatedMethodType=([Ljava/lang/String;)[Ljava/lang/String;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class SassWI.Etl$$$Lambda$2075/1442249061, SassWI.Etl$$$Lambda$2075/1442249061@4b56b517)
    - element of array (index: 5)
    - array (class [Ljava.lang.Object;, size 6)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372@3acc3ee)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 33 more

您知道是什么原因导致了这个错误吗?谢谢。

bvn4nwqk

bvn4nwqk1#

我终于找到了这个问题的解决方案。根据文档,Spark Dataframe 中的数组在Scala中被表示为可变的.WrappedArray。这就是为什么引擎不能强制转换这种类型!

val spark =  SparkSession.builder().getOrCreate()
    import spark.implicits._
    val codes: Map[String, String] = codesList.as[(String, String)].collect().toMap

    val transformList = udf((init: mutable.WrappedArray[String]) => {
      if(init == null) init
      else {
        init.map((code: String) => {
          if(!code.startsWith("IAB")) code.toLowerCase()
          else codes(code).toLowerCase()
        })
      }
    }).apply(col("interests"))

    df.withColumn("newInterests", array_distinct(transformList))
  }

感谢所有给予帮助的人!

相关问题