Kafka Streams -类型JoinWindows的(Duration)方法已过时

cyvaqqii  于 2022-12-11  发布在  Apache
关注(0)|答案(1)|浏览(279)

我想使用JoinWindows.of(持续时间)将一个KStream〈String,String〉与一个KTable〈Windowed,int[]〉连接起来,以获取最近一个小时的结果。
我的代码如下:

Duration windowSize = Duration.ofMinutes(60);
Duration advanceSize = Duration.ofMinutes(1);
TimeWindows hoppingWindow = TimeWindows.ofSizeWithNoGrace(windowSize).advanceBy(advanceSize);
Duration joinWindowSizeMs = Duration.ofHours(1);

// Aggregate to get [sum, count] in the last time window
KTable<Windowed<String>, int[]> averageTemp = mainStreamStandard.groupByKey()
.windowedBy(hoppingWindow)
.aggregate( () -> new int[]{0 ,0}, (aggKey, newVal, aggValue) -> {
        aggValue[0] += Integer.valueOf(newVal.split(":")[1]);
        aggValue[1] += 1;  
        return aggValue;
        }, Materialized.with(Serdes.String(), new IntArraySerde()));
        
// Join weather stations with their [sum,count] and their respective red alert events
KStream<String, String> joined = mainStreamAlert.join(averageTemp,
JoinWindows.of(joinWindowSizeMs),
(leftValue, rightValue) -> "left/" + leftValue + "/right/" + rightValue[0]/rightValue[1]);

它给出了一个错误消息“类型JoinWindows中的(Duration)方法已过时”。它还告诉我将“join”方法更改为“leftJoin”,但它没有更改任何内容。
有什么更好的方法吗?

oaxa6hgo

oaxa6hgo1#

of(Duration timeDifference)实际上已过时。
这个方法的直接替代是ofTimeDifferenceAndGrace(Duration timeDifference, Duration afterWindowEnd)
被弃用的方法的默认宽限期为24小时。因此,您的代码将如下所示:

...
Duration joinWindowSizeMs = Duration.ofHours(1);
Duration gracePeriod = Duration.ofHours(24);
...
// Join weather stations with their [sum,count] and their respective red alert events
KStream<String, String> joined = mainStreamAlert.join(averageTemp,
JoinWindows.ofTimeDifferenceAndGrace(joinWindowSizeMs, gracePeriod),
(leftValue, rightValue) -> "left/" + leftValue + "/right/" + rightValue[0]/rightValue[1]);

加入Windows文档

相关问题