在事件时间 Flink 水印

qhhrdooz  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(748)

I'm trying to understand watermarks with Event Time.
My Code is similar with Flink Documentation WordCount example .
I did some changes to include timestamp on event and added watermarks.

**Event format is:**word;timestamp

The map function creates a tuple3 with word;1;timestamp.
Then it's assign a watermark strategy with timestamp assigner equals to event timestamp field.
For the following stream events:

test;1662128808294
test;1662128818065
test;1662128822434
test;1662128826434
test;1662128831175
test;1662128836581

I got the following result: (test,6) => This is correct, i sent 6 times test word.
But looking for context in ProcessFunction i see the following:

Processing Time: Fri Sep 02 15:27:20 WEST 2022
Watermark: Fri Sep 02 15:26:56 WEST 2022
Start Window: 2022 09 02 15:26:40 End Window: 2022 09 02 15:27:20

The window it's correct, it's 40 seconds window as defined, and the watermark also it's correct, it's 20 seconds less the last event timestamp (1662128836581 = Friday, September 2, 2022 3:27:16) as defined in watermark strategy.
My Question is the window processing time. The window fired exactly at end window time processing time, but shouldn't wait until watermark pass the end of window (something like processing time = end of window + 20 seconds) (Window Default Trigger Docs) ?
What i'm doing wrong? or i'm having a bad understanding about watermarks?
My Code:

public class DataStreamJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        WatermarkStrategy<Tuple3<String, Integer, Long>> strategy = WatermarkStrategy
                .<Tuple3<String, Integer, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.f2);

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .map(new Splitter())
                .assignTimestampsAndWatermarks(strategy)
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(40)))
                .process(new MyProcessWindowFunction());

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter extends RichMapFunction<String, Tuple3<String, Integer, Long>> {

        @Override
        public Tuple3<String, Integer, Long> map(String value) throws Exception {
            String[] word = value.split(";");
            return new Tuple3<String, Integer, Long>(word[0], 1, Long.parseLong(word[1]));
        }
    }

    public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow> {

        @Override
        public void process(String s, ProcessWindowFunction<Tuple3<String, Integer, Long>, Tuple2<String, Integer>, String, TimeWindow>.Context context, Iterable<Tuple3<String, Integer, Long>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
            Integer sum = 0;
            for (Tuple3<String, Integer, Long> in : elements) {
                sum++;
            }

            out.collect(new Tuple2<String, Integer>(s, sum));
            Date date = new Date(context.window().getStart());
            Date date2 = new Date(context.window().getEnd());
            Date watermark = new Date(context.currentWatermark());
            Date processingTime = new Date(context.currentProcessingTime());
            System.out.println(context.currentWatermark());
            System.out.println("Processing Time: " + processingTime);
            Format format = new SimpleDateFormat("yyyy MM dd HH:mm:ss");
            System.out.println("Watermark: " + watermark);
            System.out.println("Start Window: " + format.format(date) + " End Window: " + format.format(date2));
        }
    }
}

Thanks.

tcomlyy6

tcomlyy61#

要获取事件时间窗口,您需要更改

.window(TumblingProcessingTimeWindows.of(Time.seconds(40)))

.window(TumblingEventTimeWindows.of(Time.seconds(40)))

相关问题