专用数据流buysidevolumewma(数据流buypressuretradestream){
Integer windowSize = 3;
Integer windowslide = 1;
DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide)
.apply(new AllWindowFunction<String, Double, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out)
throws Exception {
Double buySideVolumeWMA = 0.0;
Integer weight = windowSize;
Integer numerator = 1;
for (String tradeString : values) {
JSONObject json = new JSONObject(tradeString);
Double tradeVolume = (Double) json.get("Volume");
buySideVolumeWMA += ((tradeVolume * numerator) / weight);
slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator
+ " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA);
numerator++;
}
numerator = 1;
out.collect(buySideVolumeWMA / 2);
buySideVolumePressure = buySideVolumeWMA / 2;
// slf4jLogger.info("buySideVolumePressure :" +
// buySideVolumePressure);
buySideVolumeWMAStream.print().setParallelism(5);
return buySideVolumeWMAStream;
}
=============================================================================================在这个程序中,我使用的窗口大小为3,幻灯片大小为1。我希望它在接收到计数3的流数据后开始滑动,然后只按1开始滑动。但是我的程序在接收到第一个数据时会立即开始滑动,然后每接收到一个数据就滑动一次,那么如何使它只在接收到计数3的数据之后滑动,然后再滑动1呢?
2条答案
按热度按时间eivgtgni1#
可以向窗口添加偏移。这是window命令的第三个参数。在我看来,这样你可以晚些时候开始。
文档中的示例:
要了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html
nhhxz33t2#
据我所知,在2019年11月和flink 1.9.1中,这不是滑动窗的一个功能。我理解这是因为窗口对象是分离的,不共享任何状态。例如,如果使用键控流,则为每个窗口和键复制和存储一次窗口中的对象。
下面的筛选器保持足够的状态以忽略它接收的前n条消息。如果使用键控流
.keyBy(...)
--每个键将保留一个单独的计数器,因为这是flink管理valuestate对象的方式。