我在apache flink中有一个非常简单的流式管道设置,管道工作,我能够对输入数据流应用processfunction,如下所示:
DataStream<MeasurementData> data = env.addSource(consumer);
DataStream<MeasurementData> dataProcessed =data.process(new FFT());
dataProcessed.print();
dataProcessed.addSink(new FlinkKafkaProducer011<>(
"localhost:9092", // Kafka broker host:port
OUTPUT_TOPIC, // Topic to write to
new MeasurementDataSchema()) // Serializer
);
现在我想应用一个processwindowfunction,它在特定时间的窗口上运行,而不是为每个传入的数据点应用这个函数。我这样试过:
DataStream<MeasurementData> dataProcessed = data.timeWindowAll(Time.minutes(5))
.process(new MyProcessWindowFunction());
以及myprocesswindowfunction()的定义:
public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MeasurementData, MeasurementData, TimeWindow> {
public void process(Context context, Iterable<MeasurementData> input, Collector<MeasurementData> out) {
long count = 0;
for (MeasurementData data : input) {
for (int frequencyCounter = 0; frequencyCounter < data.data.size(); frequencyCounter++) {
matrices[frequencyCounter].addElement(data.u, data.v, data.data.get(frequencyCounter).get(0));
}
count++;
out.collect(data);
}
}
}
但是这个函数似乎永远不会被调用。我试着把print语句放在那里,还用调试器遍历了整个程序。我有什么遗漏吗?任何暗示都将不胜感激。
1条答案
按热度按时间zqry0prt1#
发现问题:环境设置为使用eventtime而不是processingtime,而我的数据不包含任何事件时间戳。