类可序列化时任务不可序列化

wvt8vs2t  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(575)

我在scala有以下课程

case class A
  (a:Int,b:Int) extends Serializable

当我试穿spark 2.4的时候(通过databricks)

val textFile = sc.textFile(...) 
val df = textFile.map(_=>new A(2,3)).toDF()

(编辑:调用df.collect()或注册为表时出错)
我明白了 org.apache.spark.SparkException: Task not serializable 我错过了什么?
我尝试添加编码器:

implicit def AEncoder: org.apache.spark.sql.Encoder[A] = 
  org.apache.spark.sql.Encoders.kryo[A]

import spark.implicits._
import org.apache.spark.sql.Encoders

编辑:我也尝试过:

val df = textFile.map(_=>new A(2,3)).collect()

但到目前为止运气不好。

ux6nzvsh

ux6nzvsh1#

有时,这种情况会在databricks上间歇性发生。最烦人的。
重新启动集群,然后再试一次,我有时会遇到这个错误,重新启动后就没有发生。

9q78igpj

9q78igpj2#

您可以直接将文件解析为 Dataset 用你的案例课。

case class A(a:Int,b:Int) extends Serializable
val testRDD = spark.sparkContext.textFile("file:///test_file.csv")
val testDS = testRDD.map( line => line.split(",")).map(line_cols => A(line_cols(0).toInt, line_cols(1).toInt) ).toDS()

# res23: org.apache.spark.sql.Dataset[A] = [a: int, b: int]

相关问题