spark数据集中最大的k值

bmp9r5qi  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(106)

我有一个跨多个分区的大型spark数据集。
| userId|竞赛ID|点|
| --|--|--|
| User1|竞赛1| 10 |
| User2|竞赛1| 15 |
| User3|竞赛2| 8 |
| 用户4|竞赛4| 20 |
上面的数据是巨大的,并且是均匀分区的。所以Contest 1数据分布在多个分区上。由于每个比赛的行数有很大的偏差,我不能按ContestID重新分区。
我想创建另一个数据集,它包含说,每个比赛的前10名用户。即对于每个比赛,其中10名用户得分最高的点。
如何并行处理多个分区,并对每个分区接收到的前10个值进行合并排序。我想避免排序,并在这里使用最大堆算法。

b1zrtrql

b1zrtrql1#

由于某些竞赛可能有大量的行,因此您不能按竞赛分组,收集(用户ID,点数)元组的列表并在最后选择10个最大的。
我们可以做的是定义一个topK Uplink(用户定义的聚合函数),它将在我们进行时聚合前K个元素,以避免创建具有太多行的分区。
让我们考虑以下数据集:

df.show()
+------+---------+------+
|userId|contestId|points|
+------+---------+------+
| User1| Contest1|    10|
| User2| Contest1|    15|
| User3| Contest2|     8|
| User4| Contest4|    20|
| User5| Contest1|    20|
| User6| Contest1|     5|
+------+---------+------+

然后,下面是Udash的代码及其用法:

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator
import scala.reflect.runtime.universe.TypeTag

case class Elements[T: TypeTag](seq : Seq[T])

class TopKElements[T: TypeTag](val k : Int)(implicit ord : Ordering[T])
                extends Aggregator[T, Elements[T], Elements[T]] {
    override def zero: Elements[T] = Elements(Seq.empty)

    override def reduce(b: Elements[T], a: T): Elements[T] =
        Elements((b.seq :+ a).sorted(ord.reverse).take(k))

    override def merge(b1: Elements[T], b2: Elements[T]): Elements[T] =
        Elements((b1.seq ++ b2.seq).sorted(ord.reverse).take(k))

    override def finish(reduction: Elements[T]): Elements[T] = reduction

    override def bufferEncoder: Encoder[Elements[T]] = Encoders.product

    override def outputEncoder: Encoder[Elements[T]] = Encoders.product
}

val topK = udaf(new TopKElements[(Int, String)](3))

df.groupBy("contestid")
  .agg(topK('points, 'userid)("seq") as "top_k")
  .show(false)
+---------+---------------------------------------+
|contestid|top_k                                  |
+---------+---------------------------------------+
|Contest2 |[{8, User3}]                           |
|Contest1 |[{20, User5}, {15, User2}, {10, User1}]|
|Contest4 |[{20, User4}]                          |
+---------+---------------------------------------+

相关问题