如何将udaf移植到聚合器?

8qgya5xd  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(477)

我有一个df看起来像这样:

time,channel,value
0,foo,5
0,bar,23
100,foo,42
...

我想要一个这样的df:

time,foo,bar
0,5,23
100,42,...

在spark 2中,我用了一个udaf,像这样:

case class ColumnBuilderUDAF(channels: Seq[String]) extends UserDefinedAggregateFunction {

  @transient lazy val inputSchema: StructType = StructType {
    StructField("channel", StringType, nullable = false) ::
      StructField("value", DoubleType, nullable = false) ::
      Nil
  }

  @transient lazy val bufferSchema: StructType = StructType {
    channels
      .toList
      .indices
      .map(i => StructField("c%d".format(i), DoubleType, nullable = false))
  }

  @transient lazy val dataType: DataType = bufferSchema

  @transient lazy val deterministic: Boolean = false

  def initialize(buffer: MutableAggregationBuffer): Unit = channels.indices.foreach(buffer(_) = NaN)

  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val channel = input.getAs[String](0)
    val p = channels.indexOf(channel)
    if (p >= 0 && p < channels.length) {
      val v = input.getAs[Double](1)
      if (!v.isNaN) {
        buffer(p) = v
      }
    }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =
    channels
      .indices
      .foreach { i =>
        val v2 = buffer2.getAs[Double](i)
        if ((!v2.isNaN) && buffer1.getAs[Double](i).isNaN) {
          buffer1(i) = v2
        }
      }

  def evaluate(buffer: Row): Any =
    new GenericRowWithSchema(channels.indices.map(buffer.getAs[Double]).toArray, dataType.asInstanceOf[StructType])
}

我是这样用的:

val cb = ColumnBuilderUDAF(Seq("foo", "bar"))
val dfColumnar = df.groupBy($"time").agg(cb($"channel", $"value") as "c")

然后,我重新命名 c.c0 , c.c1 等等 foo , bar 等。
在spark3中,udaf被弃用并且 Aggregator 应该改用。所以我开始这样移植它:

case class ColumnBuilder(channels: Seq[String]) extends Aggregator[(String, Double), Array[Double], Row] {

  lazy val bufferEncoder: Encoder[Array[Double]] = Encoders.javaSerialization[Array[Double]]

  lazy val zero: Array[Double] = channels.map(_ => Double.NaN).toArray

  def reduce(b: Array[Double], a: (String, Double)): Array[Double] = {
    val index = channels.indexOf(a._1)
    if (index >= 0 && !a._2.isNaN) b(index) = a._2
    b
  }

  def merge(b1: Array[Double], b2: Array[Double]): Array[Double] = {
    (0 until b1.length.min(b2.length)).foreach(i => if (b1(i).isNaN) b1(i) = b2(i))
    b1
  }

  def finish(reduction: Array[Double]): Row =
    new GenericRowWithSchema(reduction.map(x => x: Any), outputEncoder.schema)

  def outputEncoder: Encoder[Row] = ??? // what goes here?
}

我不知道如何实施 Encoder[Row] 因为spark没有一个预定义的。如果我简单地做一个简单的方法如下:

val outputEncoder: Encoder[Row] = new Encoder[Row] {
    val schema: StructType = StructType(channels.map(StructField(_, DoubleType, nullable = false)))

    val clsTag: ClassTag[Row] = classTag[Row]
  }

我得到一个 ClassCastException 因为 outputEncoder 实际上必须是 ExpressionEncoder .
那么,我如何正确地实现这一点呢?或者我还必须使用不推荐的udaf吗?

eagi6jfj

eagi6jfj1#

你可以用 groupBy 以及 pivot ```
import spark.implicits._
import org.apache.spark.sql.functions._

val df = Seq(
(0, "foo", 5),
(0, "bar", 23),
(100, "foo", 42)
).toDF("time", "channel", "value")

df.groupBy("time")
.pivot("channel")
.agg(first("value"))
.show(false)

输出:

+----+----+---+
|time|bar |foo|
+----+----+---+
|100 |null|42 |
|0 |23 |5 |
+----+----+---+

相关问题