我试图运行在databricks集群上找到的spark2.4.3文档中的示例。
我添加了缺少的方法,代码如下所示:
case class Data(i: Int)
val customSummer = new Aggregator[Data, Int, Int] {
def zero: Int = 0
def reduce(b: Int, a: Data): Int = b + a.i
def merge(b1: Int, b2: Int): Int = b1 + b2
def finish(r: Int): Int = r
def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn
val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect
我得到的错误是: org.apache.spark.SparkException: Task not serializable
我在堆栈跟踪中发现: Caused by: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn
这是完整的堆栈跟踪。
问题是,有人能运行类似的代码吗?如果是这样的话,你能告诉我哪些资源可以让我知道我缺少了什么吗?
谢谢。
1条答案
按热度按时间ebdffaop1#
将case类放到testclass之外,并使外部类具有
Aggregator
可序列化。