我想使用JoinWindows.of(持续时间)将一个KStream〈String,String〉与一个KTable〈Windowed,int[]〉连接起来,以获取最近一个小时的结果。
我的代码如下:
Duration windowSize = Duration.ofMinutes(60);
Duration advanceSize = Duration.ofMinutes(1);
TimeWindows hoppingWindow = TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advanceSize);
Duration joinWindowSizeMs = Duration.ofHours(1);
// Aggregate to get [sum, count] in the last time window
KTable<Windowed<String>, int[]> averageTemp = mainStreamStandard.groupByKey()
.windowedBy(hoppingWindow)
.aggregate( () -> new int[]{0 ,0}, (aggKey, newVal, aggValue) -> {
aggValue[0] += Integer.valueOf(newVal.split(":")[1]);
aggValue[1] += 1;
return aggValue;
}, Materialized.with(Serdes.String(), new IntArraySerde()));
// Join weather stations with their [sum,count] and their respective red alert events
KStream<String, String> joined = mainStreamAlert.join(averageTemp,
JoinWindows.of(joinWindowSizeMs),
(leftValue, rightValue) -> "left/" + leftValue + "/right/" + rightValue[0]/rightValue[1]);
它给出了一个错误消息“类型JoinWindows中的(Duration)方法已过时”。它还告诉我将“join”方法更改为“leftJoin”,但它没有更改任何内容。
有什么更好的方法吗?
1条答案
按热度按时间oaxa6hgo1#
of(Duration timeDifference)
实际上已过时。这个方法的直接替代是
ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd)
。被弃用的方法的默认宽限期为24小时。因此,您的代码将如下所示:
加入Windows文档