hadoop—是否可以保存flink数据集中批量迭代的部分输出?

bgibtngc  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(419)

我正在使用flink数据集api进行迭代计算。
但每次迭代的结果都是我完整解决方案的一部分。
(如果需要更多细节:我在每次迭代中从上到下逐层计算晶格节点,请参阅正式概念分析)
如果我在批量迭代中使用flink dataset api而不保存结果,代码如下所示:

val start = env.fromElements((0, BitSet.empty))
val end = start.iterateWithTermination(size) { inp =>
    val result = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(inp, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    (result,result)
}
end.count()

但是,如果我尝试在迭代(\ u0.writesText())或任何操作中写入部分结果,我将得到错误:

org.apache.flink.api.common.InvalidProgramException: A data set that is part of an iteration was used as a sink or action. Did you forget to close the iteration?

没有批量迭代的替代方案如下:

var start = env.fromElements((0, BitSet.empty))
var count = 1L
var all = count
while (count > 0){
    start = ObjData.mapPartition(new MyMapPartition).withBroadcastSet(start, "concepts").groupBy(0).reduceGroup(new MyReduceGroup)
    count = start.count()
    all = all + count
}
println("total nodes: " + all)

但是这种方法在最小的输入数据上非常慢,迭代版本需要<30秒,循环版本需要>3分钟。
我猜flink无法创建执行循环的最佳计划。
我应该试试什么解决办法?对flink的一些修改是否可以保存hadoop等的部分结果。?

olmpazwi

olmpazwi1#

不幸的是,目前无法从批量迭代输出中间结果。只能在迭代结束时输出最终结果。
另外,正如您正确地注意到的,flink不能有效地展开while循环或for循环,所以这也不起作用。
如果中间结果没有那么大,可以尝试将中间结果附加到部分解中,然后在迭代结束时输出所有结果。在transitiveclosurenaive示例中实现了类似的方法,其中在迭代中发现的路径在下一部分解中累积。

相关问题