我试图对数据进行聚合,但我正在尝试rollingAggregate,但由于它是流处理,它处理asyn,但我需要的是我需要按id和国家进行金额和组的总和。
下面是我的代码:
BatchStage<Object> list= // coming from jdbc
list.groupingKey( data -> {
// grouping ke logic for ID, Country
}.rollingAggregate(AggregateOperations.toList()).
map(entry -> {
if (((Entry<String, List<Object>>)entry).getKey().equals("36465,Indonesia")) {
**//Here for this ID and Country it is coming twice with different records but for each grouping key I want it just once only.**
}
List<Object> resultRow = new ArrayList<>();
((Entry<String, List<Object>>)entry).getValue().stream().forEach(data -> {
});
return resultRow;
});
数据(这里我从Jdbc源代码中获取数据):
[['id','country','id2','amount']
[3638, Dominican Republic, 'Qee', 973029],
[3638, Dominican Republic, 'Hee', 95571],
[3668, USA, 'Fee', 986839],
[3668, USA, 'CEE', 201017]]
需要的结果(但由于我有数百万条记录,因此不想一次性分组):
[['id','country','id2','amount']
[3638, Dominican Republic, 1068600],
[3668, USA,1187856]]
我正在按id和国家列进行分组。
在分组的关键是两次在Map:
第一次得到:
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Hee', 95571]]}
第二次获得:
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029]]}
但是需要一次性完成(可能是因为使用了在Batchstage中发送数据的Jdbc源?):
{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029],
[3638, Dominican Republic, 'Hee', 95571]]}
所以,任何人都可以帮助如何为每个分组关键明智的,我们得到了所有的记录在一个去,但我不希望所有的分组关键在一个去?
1条答案
按热度按时间j9per5c41#
rollingAggregate
输出每个输入项的聚合结果,请参阅JavaDoc。因此,您应该看到以下内容:
首先,您收到的关键1项,比2项等。如果您一次只能看到一个项目,则您的组功能可能出错。
如果你不想要中间结果,而只想要所有项目的最终聚合,你应该使用
aggregate
:其由以下代码产生: