ComTime 2 KeyValueGroupedData集

j8ag8udp  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(281)

我有两个数据集,我分组的关键

val stgDS = Seq(("1", "1"), ("1", "2"), ("1", "3"), ("1", "4"), ("1", "5"), ("2", "1"), ("2", "2"), ("2", "3"), ("2", "4"), ("2", "5"))
      .toDF("number", "time")
      .as[Stg]

val aggDS = Seq(("1", "1"), ("1", "4"), ("1", "8"), ("2", "2"), ("2", "5"))
  .toDF("number", "time")
  .as[Agg]

在那之后,我可以像这样对每个值应用一个函数

stgDS.groupByKey(_.number)
  .flatMapGroups{case(k, iterator) => somefunction(iterator)}

我该怎么组合

stgDS.groupByKey(_.number)
aggDS.groupByKey(_.number)

得到像这样的东西

(k, (iteratorStg, iteratorAgg))

然后执行

.flatMapGroups{case(k, (iteratorStg, iteratorAgg)) => somefunction(iteratorStg, iteratorAgg)}

我正在研究combinebykey finction,但要么它只是分组的另一个变体,要么我不知道它是如何工作的。
简单的连接不行,因为我想分别循环这些迭代器。

n3h0vuf2

n3h0vuf21#

两个keyvaluegroupeddataset可以与cogroup组合。
从cogroup文档:
将给定函数应用于每个共组数据。对于每个唯一的组,函数将被传递分组键和2个迭代器,其中包含来自数据集this和other的组中的所有元素。
代码

val stgGroupedDS = stgDS.groupByKey(_.number)
val aggGroupedDS = aggDS.groupByKey(_.number)

stgGroupedDS.cogroup(aggGroupedDS)(
    //run whatever logic is required and then return an iterator
    (key:String, it1:Iterator[Stg], it2:Iterator[Avg])  
        => Seq((key, it1.toList.mkString(",") + "//" + it2.toList.mkString(",")))
              .iterator
)
.show(false)

印刷品

+---+------------------------------------------------------------------------+
|_1 |_2                                                                      |
+---+------------------------------------------------------------------------+
|1  |Stg(1,1),Stg(1,2),Stg(1,3),Stg(1,4),Stg(1,5)//Avg(1,1),Avg(1,4),Avg(1,8)|
|2  |Stg(2,1),Stg(2,2),Stg(2,3),Stg(2,4),Stg(2,5)//Avg(2,2),Avg(2,5)         |
+---+------------------------------------------------------------------------+

相关问题