java Hazelcast Jet后rollingAggregate如何一次性获得每个分组值?

gcuhipw9  于 2023-06-20  发布在  Java
关注(0)|答案(1)|浏览(100)

我试图对数据进行聚合,但我正在尝试rollingAggregate,但由于它是流处理,它处理asyn,但我需要的是我需要按id和国家进行金额和组的总和。
下面是我的代码:

BatchStage<Object> list= // coming from jdbc

list.groupingKey( data -> {
  // grouping ke logic for ID, Country
}.rollingAggregate(AggregateOperations.toList()).
map(entry -> {

      if (((Entry<String, List<Object>>)entry).getKey().equals("36465,Indonesia")) {
        **//Here for this ID and Country it is coming twice with different records but for each grouping key I want it just once only.**
      }         

     List<Object> resultRow = new ArrayList<>();
     ((Entry<String, List<Object>>)entry).getValue().stream().forEach(data -> {
     
      });
      return resultRow;
});

数据(这里我从Jdbc源代码中获取数据):

[['id','country','id2','amount']
 [3638, Dominican Republic, 'Qee', 973029], 
 [3638, Dominican Republic, 'Hee', 95571], 
 [3668, USA, 'Fee', 986839], 
 [3668, USA, 'CEE', 201017]]

需要的结果(但由于我有数百万条记录,因此不想一次性分组):

[['id','country','id2','amount']
 [3638, Dominican Republic, 1068600],  
 [3668, USA,1187856]]

我正在按id和国家列进行分组。
在分组的关键是两次在Map:
第一次得到:

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Hee', 95571]]}

第二次获得:

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029]]}

但是需要一次性完成(可能是因为使用了在Batchstage中发送数据的Jdbc源?):

{"3638,Dominican Republic" : [[3638, Dominican Republic, 'Qee', 973029], 
 [3638, Dominican Republic, 'Hee', 95571]]}

所以,任何人都可以帮助如何为每个分组关键明智的,我们得到了所有的记录在一个去,但我不希望所有的分组关键在一个去?

j9per5c4

j9per5c41#

rollingAggregate输出每个输入项的聚合结果,请参阅JavaDoc。
因此,您应该看到以下内容:

3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029]]
3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029], Item[id=3638, country=Dominican Republic, id2=Hee, amount=95571]]
3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839]]
3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839], Item[id=3668, country=USA, id2=Cee, amount=201017]]

首先,您收到的关键1项,比2项等。如果您一次只能看到一个项目,则您的组功能可能出错。
如果你不想要中间结果,而只想要所有项目的最终聚合,你应该使用aggregate

3668,USA=[Item[id=3668, country=USA, id2=Fee, amount=986839], Item[id=3668, country=USA, id2=Cee, amount=201017]]
3638,Dominican Republic=[Item[id=3638, country=Dominican Republic, id2=Qee, amount=973029], Item[id=3638, country=Dominican Republic, id2=Hee, amount=95571]]

其由以下代码产生:

public record Item(int id, String country, String id2, int amount) implements Serializable {
}

public static void main(String[] args) {

    Pipeline p = Pipeline.create();
    p.readFrom(TestSources.items(
             new Item(3638, "Dominican Republic", "Qee", 973029),
             new Item(3638, "Dominican Republic", "Hee", 95571),
             new Item(3668, "USA", "Fee", 986839),
             new Item(3668, "USA", "Cee", 201017)
     )).groupingKey(item -> item.id + "," + item.country)
     .aggregate(AggregateOperations.toList())
     .writeTo(Sinks.logger());

    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(p).join();
}

相关问题