我试图从hdfs中读取csv文件,然后将其写入配置单元表。但是出现了task not serializable错误。无论我是否使用broadcast,都会出现相同的错误。谁能帮我。
case class UploadJobInfo(ai: String, stm: Long, timePartition: String, platform: String)
private val info = UploadJobInfo(ai, stm, timePartition, platformPartition)
private val uploadJobInfo: Broadcast[UploadJobInfo] = spark.sparkContext.broadcast(info)
spark.read
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
.option("header", true)
.option("delimiter", ",")
.csv(csvPath)
.mapPartitions(partition => {
val jobInfoValue = uploadJobInfo.value
partition.map { row =>
val mp = mutable.Map.empty[String, String]
val columnName = row.schema.map(_.name).filter(!_.equals("cs1"))
for (i <- columnName.indices) {
mp.put(columnName(i), row.getAs[String](i))
}
val csid: Integer = null
val cs1 = row.getAs[String]("cs1")
(jobInfoValue.ai, "", cs1, csid, jobInfoValue.stm, mp.toMap, "", jobInfoValue.timePartition, jobInfoValue.platform)
}
})
.toDF()
.createOrReplaceTempView(tem_tableName)
spark.sql(
s"""
|insert overwrite table $USER_PROPS partition(time,platform)
|select * from $tem_tableName
""".stripMargin)
错误堆栈跟踪:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask
Serialization stack:
- object not serializable (class: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask, value: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask@722fa16d)
- field (class: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask$$anonfun$compute$1, name: $outer, type: class io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask)
- object (class io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask$$anonfun$compute$1, <function1>)
- field (class: org.apache.spark.sql.execution.MapPartitionsExec, name: func, type: interface scala.Function1)
- object (class org.apache.spark.sql.execution.MapPartitionsExec, MapPartitions <function1>, obj#29: io.growing.offline.business.statistic.daily.transform.UserProps
+- DeserializeToObject createexternalrow(cs1#10.toString, user_created_at#11.toString, life_cycle_ppl#12.toString, StructField(cs1,StringType,true), StructField(user_created_at,StringType,true), StructField(life_cycle_ppl,StringType,true)), obj#28: org.apache.spark.sql.Row
+- *(1) FileScan csv [cs1#10,user_created_at#11,life_cycle_ppl#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://growingFS/user/douyp/user_props], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cs1:string,user_created_at:string,life_cycle_ppl:string>
)
- field (class: org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5, name: $outer, type: class org.apache.spark.sql.execution.MapPartitionsExec)
- object (class org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5, <function1>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$23, type: interface scala.Function1)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
- field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
- object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24, <function3>)
- field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[12] at insertInto at ManualUserPropsJob.scala:81)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@228674ad)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@403d5738)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@228674ad))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[13] at insertInto at ManualUserPropsJob.scala:81)
- field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
- object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@589eec78)
- writeObject data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@66f2f72b)
- writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
- object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@589eec78))
- field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
- object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[14] at insertInto at ManualUserPropsJob.scala:81)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[14] at insertInto at ManualUserPropsJob.scala:81,<function2>))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1165)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 31 more
暂无答案!
目前还没有任何答案,快来回答吧!