运行spark聚合器示例

rn0zuynd  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(390)

我试图运行在databricks集群上找到的spark2.4.3文档中的示例。
我添加了缺少的方法,代码如下所示:

case class Data(i: Int)

val customSummer =  new Aggregator[Data, Int, Int] {
 def zero: Int = 0
 def reduce(b: Int, a: Data): Int = b + a.i
 def merge(b1: Int, b2: Int): Int = b1 + b2
 def finish(r: Int): Int = r
 def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
 def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
}.toColumn

val ds = Seq(Data(1)).toDS
val aggregated = ds.select(customSummer).collect

我得到的错误是: org.apache.spark.SparkException: Task not serializable 我在堆栈跟踪中发现: Caused by: java.io.NotSerializableException: org.apache.spark.sql.TypedColumn 这是完整的堆栈跟踪。
问题是,有人能运行类似的代码吗?如果是这样的话,你能告诉我哪些资源可以让我知道我缺少了什么吗?
谢谢。

ebdffaop

ebdffaop1#

将case类放到testclass之外,并使外部类具有 Aggregator 可序列化。

class Test extends Serializable {
 @Test
  def test62805430(): Unit = {

    val customSummer =  new Aggregator[Data, Int, Int] {
      def zero: Int = 0
      def reduce(b: Int, a: Data): Int = b + a.i
      def merge(b1: Int, b2: Int): Int = b1 + b2
      def finish(r: Int): Int = r
      def bufferEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
      def outputEncoder: Encoder[Int] = org.apache.spark.sql.Encoders.scalaInt
    }.toColumn

    val ds = Seq(Data(1)).toDS
    val aggregated = ds.select(customSummer).collect
    println(aggregated.mkString(",")) // 1
  }

}

case class Data(i: Int)

相关问题