我们正在调查flink集群中一些与gc相关的内存泄漏问题,在此过程中我们发现了以下观察结果
我们在flink集群上运行beam作业,利用side-input特性将一些额外的数据提供给用于某些转换的主事件流
由于某种原因,我们的flink作业重新启动了,在重新启动后,当我们进行堆转储时,我们可以看到多个side输入数据副本处于flink作业的状态(我们使用ricksdb作为状态后端)
主流有一个小时的窗口
Window<KV<String, EventDetails>> window = Window.<KV<String, EventDetails>>into(FixedWindows.of(Duration.standardHours(hrs)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(500))))
.withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes();
边输入流也有窗口
PCollection<Map<String, Object>> sideinputSnapshot = sideInput
.apply(Window.<Map<String, Object>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO).discardingFiredPanes());
this.view = sideinputSnapshot.apply(View.asSingleton());
请告诉我们,代码逻辑有什么问题吗?或者是梁的侧面输入有问题?
暂无答案!
目前还没有任何答案,快来回答吧!