我在一个scalaSpark项目工作。我想检索一个代码的措辞与2 Dataframe 。
我有2个 Dataframe :
- 一个列有代码,我将检索措辞。(列兴趣)
- 第二个有2列代码和该代码的措辞。
dataframe
我这样做:
def CodeToInterest(df: sql.DataFrame, codesList: sql.DataFrame) : sql.DataFrame = {
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val transformList = udf((init: Array[String]) => {
if(init == null) return null
else init.map((code: String) => {
if(!code.startsWith("IAB")) code
else codesList.filter($"Code" === code)
.first()
.getAs[String]("Interest")
})
}).apply(col("interests"))
df.withColumn("newInterests", transformList)
}
但我有个错误。
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
at SassWI.RetrieveData$.main(RetrieveData.scala:46)
at SassWI.RetrieveData.main(RetrieveData.scala)
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
- object not serializable (class: java.lang.Object, value: java.lang.Object@3aeb267)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class SassWI.Etl$, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic SassWI/Etl$.$anonfun$CodeToInterest$1:(Ljava/lang/Object;Lorg/apache/spark/sql/Dataset;Lorg/apache/spark/sql/SparkSession;[Ljava/lang/String;)[Ljava/lang/String;, instantiatedMethodType=([Ljava/lang/String;)[Ljava/lang/String;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class SassWI.Etl$$$Lambda$2075/1442249061, SassWI.Etl$$$Lambda$2075/1442249061@4b56b517)
- element of array (index: 5)
- array (class [Ljava.lang.Object;, size 6)
- element of array (index: 1)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1308/216359372@3acc3ee)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 33 more
您知道是什么原因导致了这个错误吗?谢谢。
1条答案
按热度按时间bvn4nwqk1#
我终于找到了这个问题的解决方案。根据文档,Spark Dataframe 中的数组在Scala中被表示为可变的.WrappedArray。这就是为什么引擎不能强制转换这种类型!
感谢所有给予帮助的人!