关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。
两年前关门了。
改进这个问题
看起来tumblingprocessingtimewindow总是使用“摄取时间”。有没有办法在事件时间强制打开窗口?
我的用例非常简单,我接收包含“事件时间戳”的事件,并希望它们基于事件时间进行聚合。
e、 g.在以下代码中,我期望有2个输出:
public class WindowExample {
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Bean> beans = env.fromElements(
new Bean(1, 1, "12:00:00"),
new Bean(1, 2, "12:00:03"),
new Bean(1, 1, "12:00:04"), //window of 3 sec trigger here
new Bean(1, 2, "12:00:05"),
new Bean(1, 3, "12:00:06"),
new Bean(1, 3, "12:00:07") //window of 3 sec trigger here
);
beans.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Bean>() {
@Override public long extractAscendingTimestamp(Bean element) {
return element.getTs();
}
})
.keyBy("id")
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.max("value")
.addSink(new SinkFunction<Bean>() {
@Override public void invoke(Bean value, Context context) {
System.out.println("Sync on: "+value);
}
});
env.execute("Windowing test");
}
public static class Bean {
private int id;
private int value;
private long ts;
public Bean() {
}
Bean(int id, int value, String time) throws ParseException {
this.id = id;
this.value = value;
this.ts = FORMAT.parse(time).toInstant().toEpochMilli();
}
long getTs() {
return ts;
}
// other getters and setters
}
}
1条答案
按热度按时间8zzbczxx1#
flink允许对事件时间流使用处理时间窗口,因为这是合法的用例。但是如果你真的想要事件时间窗口,你需要要求它。在这种情况下,您应该使用
TumblingEventTimeWindows
.