我有两个数据集,我分组的关键
val stgDS = Seq(("1", "1"), ("1", "2"), ("1", "3"), ("1", "4"), ("1", "5"), ("2", "1"), ("2", "2"), ("2", "3"), ("2", "4"), ("2", "5"))
.toDF("number", "time")
.as[Stg]
val aggDS = Seq(("1", "1"), ("1", "4"), ("1", "8"), ("2", "2"), ("2", "5"))
.toDF("number", "time")
.as[Agg]
在那之后,我可以像这样对每个值应用一个函数
stgDS.groupByKey(_.number)
.flatMapGroups{case(k, iterator) => somefunction(iterator)}
我该怎么组合
stgDS.groupByKey(_.number)
aggDS.groupByKey(_.number)
得到像这样的东西
(k, (iteratorStg, iteratorAgg))
然后执行
.flatMapGroups{case(k, (iteratorStg, iteratorAgg)) => somefunction(iteratorStg, iteratorAgg)}
我正在研究combinebykey finction,但要么它只是分组的另一个变体,要么我不知道它是如何工作的。
简单的连接不行,因为我想分别循环这些迭代器。
1条答案
按热度按时间n3h0vuf21#
两个keyvaluegroupeddataset可以与cogroup组合。
从cogroup文档:
将给定函数应用于每个共组数据。对于每个唯一的组,函数将被传递分组键和2个迭代器,其中包含来自数据集this和other的组中的所有元素。
代码
印刷品