具有高并行性的网络缓冲区数量不足

gmxoilav  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(414)

我在尝试启动管道时遇到了一个错误,我想知道是否有任何方法可以解决这个问题,而不必经常使用配置:

java.io.IOException: Insufficient number of network buffers: required 1, but only 0 available. The total number of network buffers is currently set to 32768 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
    at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:268)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.setupPartition(NetworkEnvironment.java:212)
    at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:193)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)
    at java.lang.Thread.run(Thread.java:749)

正如错误所提示的,我尝试同时增加taskmanager.network.memory.fraction(0.1->0.7),然后增加taskmanager.network.memory.max(1 gb->4 gb),这对于我当前的配置应该足够了。
我当前的配置是:
10个示例/任务管理器
每个任务管理器25个并行/任务槽
总共250个任务槽
任务管理器堆大小-44 gb
跟随https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#configuring-计算的网络缓冲区应为:


# slots-per-TM^2 * #TMs * 4 * 32000 kb/buffer -> 25^2 * 10 * 4 * 32000 = 800 mb roughly

在默认的最大网络内存配置(最大1GB)下,以200并行度旋转管道可以正常工作,但是在超过210的情况下会遇到问题—我很难理解为什么在这种情况下4GB仍然不够。
任何提示都会很棒
谢谢
编辑(添加管线草图):

final SingleOutputStreamOperator<MyObject> stream = eventDataStream
            .rebalance() // evenly distribute the IO heavy augmenter work to all available workers.
            // Filter bad events
            .filter(new BadEventFilter())
            .name("BadEventFilter")
            .map(myMapper1)
            .name("myMapper1")
            .disableChaining()
            .flatMap(myFlatMapper)
            .name("myFlatMapper")
            .disableChaining()
            .map(myMapper2)
            .name("myMapper2")
            .startNewChain()
            .keyBy(mySelector)
            //Use the keyed events to create appropriate grouping
            .process(groupingProcessing)
            .name("groupingProcessing")
            .rebalance()
            .flatMap(myGroupingFlatMapper)
            .name("myGroupingFlatMapper")
            .startNewChain()
            .map(myMapper3)
            .name("myMapper3")
            .process(sideArchive)
            .name("sideArchive");
    stream.getSideOutput("myOutputTag")
            .addSink(sink)
            .name("archive");
    stream.addSink(sink)
            .name("sink");
col17t5w

col17t5w1#

问题是,您有太多的洗牌,因此需要大量的网络内存缓冲区。
公式


# slots-per-TM^2 * #TMs * 4

是一个洗牌步骤,如果我没弄错的话,你有7或8(2个重新平衡,2个新链开始,2个链禁用,2个下沉)。在这种情况下,即使是粗略的估计也是错误的 25^2 * 10 * 4 * 8 = 200,000 网络缓冲区,在你的情况下,你有一个多一点,然后 32,000 .
为什么简单的增加到 4Gb 无济于事的可以是 4Gb 未达到最大内存限制-实际容量计算如下 taskmanager.network.memory.fraction Flink管理的内存。
老实说,我不认为有任何理由禁用链和启动新的链-这只是引入了更多的洗牌,所以我建议不要调整网络设置,而是放弃新的链创建和简化一点管道:

final SingleOutputStreamOperator<MyObject> stream = eventDataStream
            // Filter bad events
            // filter before rebalance to reduce the network traffic.
            .filter(new BadEventFilter())
            .name("BadEventFilter")
            .rebalance() // evenly distribute the IO heavy augmenter work to all available workers.
            .map(myMapper1)
            .name("myMapper1")
            //.disableChaining()
            .flatMap(myFlatMapper)
            .name("myFlatMapper")
            //.disableChaining()
            .map(myMapper2)
            .name("myMapper2")
            //.startNewChain()
            .keyBy(mySelector)
            //Use the keyed events to create appropriate grouping
            .process(groupingProcessing)
            .name("groupingProcessing")
            //.rebalance()
            .flatMap(myGroupingFlatMapper)
            .name("myGroupingFlatMapper")
            //.startNewChain()
            .map(myMapper3)
            .name("myMapper3")
            .process(sideArchive)
            .name("sideArchive");
    stream.getSideOutput("myOutputTag")
            .addSink(sink)
            .name("archive");
    stream.addSink(sink)
            .name("sink");

相关问题