我已经在spark中编写了一个简单的代码。从dataframe列中获取文件位置并返回字符串,不管它是否存在。但一旦我运行这个,它将抛出一个“任务不可序列化”。有人能帮我摆脱这个错误吗?
object filetospark{
def main(args: Array[String]) : Unit = {
val spark = SparkSession
.builder()
.appName("app1")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path: String => String = (Path: String) => {
val exists = fs.exists(new Path(Path))
var result = " "
if (exists) {
result = "Y"
}
else {
result = "N"
}
result
}
val PATH = udf(path)
val config_df=spark.read.
option("header","true").
option("inferSchema","true").
csv("pathlocation")
val current_date=LocalDate.now()
val instance_table_df=instance_df.withColumn("is_available",PATH(col("file_name")))
像这样的错误
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:613)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2545)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2545)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2759)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:255)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:292)
at org.apache.spark.sql.Dataset.show(Dataset.scala:746)
at org.apache.spark.sql.Dataset.show(Dataset.scala:705)
at org.apache.spark.sql.Dataset.show(Dataset.scala:714)
at filetospark$.main(filetospark.scala:40)
at filetospark.main(filetospark.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.fs.LocalFileSystem
Serialization stack:
- object not serializable (class: org.apache.hadoop.fs.LocalFileSystem, value: org.apache.hadoop.fs.LocalFileSystem@7fd3fd06)
- field (class: filetospark$$anonfun$1, name: fs$1, type: class org.apache.hadoop.fs.FileSystem)
- object (class filetospark$$anonfun$1, <function1>)
- element of array (index: 4)
- array (class [Ljava.lang.Object;, size 5)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 36 more
它表明这个错误有人可以解决这个问题
1条答案
按热度按时间sg3maiej1#
我不知道这里发生了什么。现在那个错误被澄清了。但我的怀疑还在这里。我只是在主函数外创建了spark会话。它工作正常。但是我不知道这里发生了什么。如果有人知道,请在这里发布。