apachebeam—在flink作业重新启动几次后,在堆转储中发现的side inputs数据的多个对象引用

vhipe2zx  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(214)

我们正在调查flink集群中一些与gc相关的内存泄漏问题,在此过程中我们发现了以下观察结果
我们在flink集群上运行beam作业,利用side-input特性将一些额外的数据提供给用于某些转换的主事件流
由于某种原因,我们的flink作业重新启动了,在重新启动后,当我们进行堆转储时,我们可以看到多个side输入数据副本处于flink作业的状态(我们使用ricksdb作为状态后端)
主流有一个小时的窗口

Window<KV<String, EventDetails>> window = Window.<KV<String, EventDetails>>into(FixedWindows.of(Duration.standardHours(hrs)))
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(500))))
                    .withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS)
                    .discardingFiredPanes();

边输入流也有窗口

PCollection<Map<String, Object>> sideinputSnapshot = sideInput
                .apply(Window.<Map<String, Object>>into(new GlobalWindows())
                        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                        .withAllowedLateness(Duration.ZERO).discardingFiredPanes());

        this.view = sideinputSnapshot.apply(View.asSingleton());

请告诉我们,代码逻辑有什么问题吗?或者是梁的侧面输入有问题?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题