spark3.0在scala/java上的排序与应用

sqxo8psd  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(419)

我有星火数据集,让我们看看a,b,c
我想带你去
a列上的组
对b列上的组(不是整个数据集)进行排序
迭代单个组,在连续的n行之间寻找某种序列/模式,并基于形成结果数据集的critera返回行

dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
                .reduceGroup({})

在Pypark

我们可以在pandas上调用apply函数,在pandas上进行分组和排序,但是它的速度比flink慢10倍
注意:我想对分组数据进行处理,并返回另一个不是标准聚合的数据集
有人能给我介绍一下类似的关于如何在spark中使用java/scala的代码吗?

uinbv5nw

uinbv5nw1#

根据迭代逻辑,有几种可能的方法:

使用数据集api

鉴于

val df =
      Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
        .toDF("A", "B", "C")

先预处理一下

df.select($"A", struct($"B", $"C") as $"S").show()

得到

+---+-----------+
|  A|          S|
+---+-----------+
|  a|   [0, foo]|
|  b|   [1, foo]|
|  a|[1, foobar]|
+---+-----------+

现在我们可以将任何scala代码应用于元组序列,包括排序:

df.select($"A", struct($"B", $"C") as $"S")
      .groupBy("A")
      .agg(collect_list("S"))
      .as[(String, Seq[(Int, String)])]
      .map {
        case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
      }
      .show()

使用udafs

实现自定义udaf:

class MyAgg extends Aggregator[
      (Int, String),
      mutable.ListBuffer[(Int, String)],
      /* any output type here */] {
...

并用它进行聚合:

val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))

相关问题