当我从hdfs读取csv然后写入配置单元时,spark任务不可序列化

ctehm74n  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(252)

我试图从hdfs中读取csv文件,然后将其写入配置单元表。但是出现了task not serializable错误。无论我是否使用broadcast,都会出现相同的错误。谁能帮我。

case class UploadJobInfo(ai: String, stm: Long, timePartition: String, platform: String)
private val info = UploadJobInfo(ai, stm, timePartition, platformPartition)
private val uploadJobInfo: Broadcast[UploadJobInfo] = spark.sparkContext.broadcast(info)

spark.read
      .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
      .option("header", true)
      .option("delimiter", ",")
      .csv(csvPath)
      .mapPartitions(partition => {
        val jobInfoValue = uploadJobInfo.value
        partition.map { row =>
          val mp = mutable.Map.empty[String, String]
          val columnName = row.schema.map(_.name).filter(!_.equals("cs1"))
          for (i <- columnName.indices) {
            mp.put(columnName(i), row.getAs[String](i))
          }
          val csid: Integer = null
          val cs1 = row.getAs[String]("cs1")

          (jobInfoValue.ai, "", cs1, csid, jobInfoValue.stm, mp.toMap, "", jobInfoValue.timePartition, jobInfoValue.platform)
        }
      })
      .toDF()
      .createOrReplaceTempView(tem_tableName)

    spark.sql(
      s"""
         |insert overwrite table $USER_PROPS partition(time,platform)
         |select * from $tem_tableName
      """.stripMargin)

错误堆栈跟踪:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask
Serialization stack:
  - object not serializable (class: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask, value: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask@722fa16d)
  - field (class: io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask$$anonfun$compute$1, name: $outer, type: class io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask)
  - object (class io.growing.offline.business.statistic.daily.transform.ManualUpLoadUserPropsTask$$anonfun$compute$1, <function1>)
  - field (class: org.apache.spark.sql.execution.MapPartitionsExec, name: func, type: interface scala.Function1)
  - object (class org.apache.spark.sql.execution.MapPartitionsExec, MapPartitions <function1>, obj#29: io.growing.offline.business.statistic.daily.transform.UserProps
+- DeserializeToObject createexternalrow(cs1#10.toString, user_created_at#11.toString, life_cycle_ppl#12.toString, StructField(cs1,StringType,true), StructField(user_created_at,StringType,true), StructField(life_cycle_ppl,StringType,true)), obj#28: org.apache.spark.sql.Row
   +- *(1) FileScan csv [cs1#10,user_created_at#11,life_cycle_ppl#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://growingFS/user/douyp/user_props], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cs1:string,user_created_at:string,life_cycle_ppl:string>
)
  - field (class: org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5, name: $outer, type: class org.apache.spark.sql.execution.MapPartitionsExec)
  - object (class org.apache.spark.sql.execution.MapPartitionsExec$$anonfun$5, <function1>)
  - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$23, type: interface scala.Function1)
  - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
  - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24, name: $outer, type: class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
  - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24, <function3>)
  - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3)
  - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[12] at insertInto at ManualUserPropsJob.scala:81)
  - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
  - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@228674ad)
  - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
  - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@403d5738)
  - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
  - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@228674ad))
  - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
  - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[13] at insertInto at ManualUserPropsJob.scala:81)
  - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
  - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@589eec78)
  - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
  - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@66f2f72b)
  - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
  - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@589eec78))
  - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
  - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[14] at insertInto at ManualUserPropsJob.scala:81)
  - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
  - object (class scala.Tuple2, (MapPartitionsRDD[14] at insertInto at ManualUserPropsJob.scala:81,<function2>))
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
  at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1165)
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
  at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2067)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
  ... 31 more

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题