我使用kafka流来处理实时数据,并且需要对窗口时间的数据进行一些聚合操作。
关于聚合运算,我有两个问题。
如何获取聚合数据?我要把它送到第三服务中心。
聚合操作之后,我无法向第三个服务发送消息,代码无法运行。
这是我的密码:
stream = builder.stream("topic");
windowedKStream = stream.map(XXXXX).groupByKey().windowedBy("5mins");
ktable = windowedKStream.aggregate(()->"", new Aggregator(K,V,result));
// my data is stored in 'result' variable, but I can't get it at the end of the 5 mins window.
// I need to send the 'result' to a 3rd service. But I don't know where to temporarily store it and then how to get it.
// below is the code the call a 3rd service, but the code can't be executed(reachable).
// I think it should be executed every 5 mins when thewindows is over. But it isn't.
result = httpclient.execute('result');
1条答案
按热度按时间omqzjyyz1#
我想你可能会想做一些类似的事情:
每次
KTable
如果已更新(禁用缓存),则更新记录将发送到下游,并且foreach
将与v
是当前聚合结果。