尽管将setBufferError设置为0,但我已经测量到keyBy操作的延迟在大多数情况下通常在0-1 ms之间。然而,随着程序继续运行,我偶尔会观察到延迟达到数千毫秒。这是正常行为吗?
在这段代码中,我记录了keyBy操作前后的时间,然后我注意到了前面提到的情况。
originalTrajectories = originalTrajectories.keyBy(t -> t.getGridID()).map(new MapFunction<Trajectory1, Trajectory1>() {
@Override
public Trajectory1 map(Trajectory1 value) {
value.t2 = System.currentTimeMillis();
return value;
}
});
queryTrajectories = queryTrajectories.keyBy(t -> t.getGridID()).map(new MapFunction<Trajectory1, Trajectory1>() {
@Override
public Trajectory1 map(Trajectory1 value) {
value.t2 = System.currentTimeMillis();
return value;
}
});
DataStream<Tuple4<Long, Long, Trajectory1, Trajectory1>> timedResultTrajectories = originalTrajectories
.keyBy(t -> t.getGridID())
.intervalJoin(queryTrajectories.keyBy(t -> t.getGridID()))
.between(Time.milliseconds(-slideStep * 1000), Time.milliseconds(slideStep * 1000))
.process(new ProcessJoinFunction<Trajectory1, Trajectory1, Tuple4<Trajectory1, Trajectory1, Long, Double>>() {
@Override
public void processElement(Trajectory1 t1, Trajectory1 t2, ProcessJoinFunction<Trajectory1, Trajectory1, Tuple4<Trajectory1, Trajectory1, Long, Double>>.Context ctx, Collector<Tuple4<Trajectory1, Trajectory1, Long, Double>> out) {
if (Math.abs(t1.getTimestamp() - t2.getTimestamp()) == 0) {
Double distance = calculateClosestPairDistance(t1, t2);
if (distance <= queryRadius) {
out.collect(Tuple4.of(t1, t2, System.currentTimeMillis(), distance));
}
}
}
})
字符串
1条答案
按热度按时间ymzxtsji1#
当然,偶尔的延迟也是可能的,如果像您指出的那样,延迟长达几秒钟,我会将垃圾收集视为一个可能的原因。
Flink 1.18包含对Java 17的实验性支持。如果GC确实是问题所在,您可以考虑尝试Java 17及其改进GC的选项。如果您想更进一步,Flink的主分支现在包含对JDK 21的实验性支持,其中ZGC已更新为添加多代GC。
根据其他人的报告,我希望使用Java 17和ZGC的Flink 1.18在最坏情况下的延迟方面有显着改善,但吞吐量会有所下降。Java 21的初步实验显示吞吐量有希望改善。