我们使用kafka ktable进行聚合,下面是我们在“输入数据-事务详细信息”中接收的数据类型(事务id、状态、类别、金额……)
我们根据下面的分组键(status,category)对上面的内容进行分组
应用程序逻辑
Grouped Stream. Aggregate(() -> new Instance(), (key, newVal, aggVal) - > addAmount(newVal). (key, oldVal, aggVal) - > removeAmount(oldVal));
假设我们得到的数据流如下(事务ID、状态、类别、金额)
1-1,待定,现金,10//(待定,现金)-10合计价值
2-2,待定,现金,20/(待定,现金)-30
3-3,实际,卡,15/(实际,卡)-15
4 - 1. 挂起,卡,9/(挂起,现金)-30,(挂起,卡)-9----这就是我们遇到的问题
在#4中,虽然更新了相同事务ID 1上的,但分组键发生了变化(从现金到卡),现在由于分组发生了变化,它不调用removeMoutn()方法,而只调用addamount()方法。
关于如何解决这个问题的任何想法,如果分组发生了变化,它也应该处理早期聚合的数据。
我在这里发现了类似的用例https://stackoverflow.com/a/42685866/2699756
但我不知道你是怎么解决这个问题的。
暂无答案!
目前还没有任何答案,快来回答吧!