1.6.2中

62o28rlo  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(367)

我正在使用intervaljoin函数在10分钟内连接两个流。具体如下:

labelStream.intervalJoin(adLogStream)
           .between(Time.milliseconds(0), Time.milliseconds(600000))
           .process(new processFunction())
           .sink(kafkaProducer)

labelstream和adlogstream是由长id键控的原始buf类。
我们的两个输入流是巨大的。运行大约30分钟后,Kafka的输出缓慢下降,如下所示:

当数据输出开始下降时,我使用jstack和pstack多次得到以下结果:


似乎程序在rockdb的seek中受阻了。我发现一些rockdb的srt文件被迭代访问得很慢。

我试过几种方法:

1)Reduce the input amount to half. This works well.
2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
4)Use new versions of rocksdbjni. This still fails.

谁能给我一些建议吗?非常感谢你。

bvk5enib

bvk5enib1#

一些想法:
您可以在flink用户邮件列表上提问——一般来说,像这样的操作性问题更可能在邮件列表上而不是在堆栈溢出上引发有见地的响应。
我听说如果rocksdb有更多的堆外内存可供使用,它会有所帮助,因为rocksdb将使用它进行缓存。抱歉,我不知道如何配置这个的细节。
也许增加并行性会有所帮助。
如果有可能的话,试着用基于堆的状态后端来运行可能会很有趣,看看rocksdb造成了多大的痛苦。

相关问题