Flink keyBy操作的高延迟

bf1o4zei  于 11个月前  发布在  Apache
关注(0)|答案(1)|浏览(144)

尽管将setBufferError设置为0,但我已经测量到keyBy操作的延迟在大多数情况下通常在0-1 ms之间。然而,随着程序继续运行,我偶尔会观察到延迟达到数千毫秒。这是正常行为吗?
在这段代码中,我记录了keyBy操作前后的时间,然后我注意到了前面提到的情况。

originalTrajectories = originalTrajectories.keyBy(t -> t.getGridID()).map(new MapFunction<Trajectory1, Trajectory1>() {
            @Override
            public Trajectory1 map(Trajectory1 value) {
                value.t2 = System.currentTimeMillis();
                return value;
            }
        });

queryTrajectories = queryTrajectories.keyBy(t -> t.getGridID()).map(new MapFunction<Trajectory1, Trajectory1>() {
            @Override
            public Trajectory1 map(Trajectory1 value) {
                value.t2 = System.currentTimeMillis();
                return value;
            }
        });

DataStream<Tuple4<Long, Long, Trajectory1, Trajectory1>> timedResultTrajectories = originalTrajectories
            .keyBy(t -> t.getGridID())
            .intervalJoin(queryTrajectories.keyBy(t -> t.getGridID()))
            .between(Time.milliseconds(-slideStep * 1000), Time.milliseconds(slideStep * 1000))
            .process(new ProcessJoinFunction<Trajectory1, Trajectory1, Tuple4<Trajectory1, Trajectory1, Long, Double>>() {
                @Override
                public void processElement(Trajectory1 t1, Trajectory1 t2, ProcessJoinFunction<Trajectory1, Trajectory1, Tuple4<Trajectory1, Trajectory1, Long, Double>>.Context ctx, Collector<Tuple4<Trajectory1, Trajectory1, Long, Double>> out) {
                    if (Math.abs(t1.getTimestamp() - t2.getTimestamp()) == 0) {
                        Double distance = calculateClosestPairDistance(t1, t2);
                        if (distance <= queryRadius) {
                            out.collect(Tuple4.of(t1, t2, System.currentTimeMillis(), distance));
                        }
                    }
                }
            })

字符串

ymzxtsji

ymzxtsji1#

当然,偶尔的延迟也是可能的,如果像您指出的那样,延迟长达几秒钟,我会将垃圾收集视为一个可能的原因。
Flink 1.18包含对Java 17的实验性支持。如果GC确实是问题所在,您可以考虑尝试Java 17及其改进GC的选项。如果您想更进一步,Flink的主分支现在包含对JDK 21的实验性支持,其中ZGC已更新为添加多代GC。
根据其他人的报告,我希望使用Java 17和ZGC的Flink 1.18在最坏情况下的延迟方面有显着改善,但吞吐量会有所下降。Java 21的初步实验显示吞吐量有希望改善。

相关问题