Apache·Flink创造了错误的计划

cld4siwp  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(659)

我为ApacheFlink创建了一个简单的作业,它使用gelly提供的pagerank实现。
在ide内部运行,一切正常。但是,我尝试使用jobmanagerweb界面将包含作业的jar提交给在我的机器上运行的flink示例。但是,flink没有得到正确的作业计划并执行pagerank,而是提出并执行了一个非常奇怪的计划,只计算图的顶点数。
我做了一些研究和调试,发现gelly提供的pagerank的实现开始计算图的顶点数,但它没有作为算法的参数提供:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

此计算意味着一个嵌入式作业。由于运算符是惰性的,因此不会触发任何计算。在flink服务器中,首先要做的是获取作业计划。这是在一个特殊的环境下完成的, OptimizerPlanEnvironment ,它提供了以下内容 result 方法:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

问题来自这里。一旦 ProgramAbortException 抛出时,程序返回到目前为止计算的计划。但是只计算了内部作业计划,因此这样就不会计算或执行主作业计划。
这是我使用的代码:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

如果提供了顶点数,则执行例如。 graph.run(new PageRank<Long>(0.85, 5, 10)) ,没有问题,正确计算了计划并计算了pagerank。
我的问题是:我做错了什么?或者这真的是Flink的虫子?

jq6vz3qz

jq6vz3qz1#

问题是,正如你所说的 network.numberOfVertices 内部通话 count 在顶点数据集上。这将触发一个独立的flink作业来计算计数值。此值通常由 main 方法。但是,在web客户端提交的情况下,由于 OptimizerPlanEnvironment ,它只允许编译单个flink作业。这种行为类似于分离的执行模式,它也不支持急切的计划执行。
这是目前flink的web客户端的一个限制。这种行为的原因是flink不想阻塞netty channel handler线程,这对于等待 count 操作。阻塞操作将使线程池处于饥饿状态,并使此会话的web接口在取消阻塞之前没有响应。

相关问题