apache flink-timecharacteristic.eventtime的tumblingprocessingtimewindow用法

xpcnnkqh  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(382)

关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。

两年前关门了。
改进这个问题
看起来tumblingprocessingtimewindow总是使用“摄取时间”。有没有办法在事件时间强制打开窗口?
我的用例非常简单,我接收包含“事件时间戳”的事件,并希望它们基于事件时间进行聚合。
e、 g.在以下代码中,我期望有2个输出:

public class WindowExample {

private static final SimpleDateFormat FORMAT = new SimpleDateFormat("HH:mm:ss");

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStreamSource<Bean> beans = env.fromElements(
        new Bean(1, 1, "12:00:00"),
        new Bean(1, 2, "12:00:03"),
        new Bean(1, 1, "12:00:04"),  //window of 3 sec trigger here
        new Bean(1, 2, "12:00:05"),
        new Bean(1, 3, "12:00:06"),
        new Bean(1, 3, "12:00:07")   //window of 3 sec trigger here
    );

    beans.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Bean>() {
        @Override public long extractAscendingTimestamp(Bean element) {
            return element.getTs();
        }
    })
        .keyBy("id")
        .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
        .max("value")
        .addSink(new SinkFunction<Bean>() {

            @Override public void invoke(Bean value, Context context) {
                System.out.println("Sync on: "+value);
            }
        });
    env.execute("Windowing test");
}

public static class Bean {

    private int id;
    private int value;
    private long ts;

    public Bean() {
    }

    Bean(int id, int value, String time) throws ParseException {
        this.id = id;
        this.value = value;
        this.ts = FORMAT.parse(time).toInstant().toEpochMilli();
    }

    long getTs() {
        return ts;
    }
    // other getters and setters
}

}

8zzbczxx

8zzbczxx1#

flink允许对事件时间流使用处理时间窗口,因为这是合法的用例。但是如果你真的想要事件时间窗口,你需要要求它。在这种情况下,您应该使用 TumblingEventTimeWindows .

相关问题