ApacheFlink的迭代流不会循环

fumotvh3  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(370)

我正在使用flink的datastreamapi实现connected components算法,因为还没有使用这个api的实现。
对于这个算法,我通过滚动窗口来分离数据。所以,对于每个窗口,我尝试独立地计算算法。
我的问题来自算法的迭代特性。我实现了交互所需的数据管道(step数据管道),它由flatmaps、1个join、1个processwindow和1个filter组成。然而,似乎我想要反馈给循环的流实际上并没有反馈给循环的开头,因为算法不会迭代。我怀疑,如果原始迭代数据流与另一个流连接(即使后者是由前者的flatmap发起的),那么就不可能这样做。
我使用的代码如下:

//neigborsList = Datastream of <Vertex, [List of neighbors], label>
IterativeStream< Tuple3<Integer, ArrayList<Integer>, Integer> > beginning_loop = neigborsList.iterate(maxTimeout);

//Emits tuples Vertices and Labels for every vertex and its neighbors

DataStream<Tuple2<Integer,Integer> > labels = beginning_loop
        //Datastream of <Vertex, label> for every neigborsList.f0 and element in neigborsList.f1
        .flatMap( new EmitVertexLabel() ) 
        .keyBy(0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
        .minBy(1)               
        ;

DataStream<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> updatedVertex = beginning_loop                
            //Update vertex label with the results from the labels reduction
            .join(labels)
            .where("vertex")
            .equalTo("vertex")
            .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .apply(new JoinFunction<Tuple3<Integer,ArrayList<Integer>,Integer>, Tuple2<Integer,Integer>, Tuple4<Integer,ArrayList<Integer>,Integer,Integer>>() {

                @Override
                public Tuple4<Integer,ArrayList<Integer>,Integer,Integer> join(
                        Tuple3<Integer, ArrayList<Integer>, Integer> arg0, Tuple2<Integer, Integer> arg1)
                        throws Exception {
                    int hasConverged = 1;
                    if(arg1.f1.intValue() < arg0.f2.intValue() )
                    {
                        arg0.f2 = arg1.f1;
                        hasConverged=0;
                    }
                    return new Tuple4<>(arg0.f0,arg0.f1,arg0.f2,new Integer(hasConverged));
                }                       

            })

            //Disseminates the convergence flag if a change was made in the window
            .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .process(new ProcessAllWindowFunction<Tuple4<Integer,ArrayList<Integer>,Integer,Integer>,Tuple4<Integer, ArrayList<Integer>, Integer, Integer>,TimeWindow >() {

                @Override
                public void process(
                        ProcessAllWindowFunction<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, TimeWindow>.Context ctx,
                        Iterable<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> values,
                        Collector<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> out) throws Exception {

                    Iterator<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> iterator = values.iterator();
                    Tuple4<Integer, ArrayList<Integer>, Integer, Integer> element;

                    int hasConverged= 1;
                    while(iterator.hasNext())
                    {
                        element = iterator.next();
                        if(element.f3.intValue()>0)
                        {
                            hasConverged=0;
                            break;
                        }

                    }

                    //Re iterate and emit the values on the correct output
                    iterator = values.iterator();                           
                    Integer converged = new Integer(hasConverged);
                    while(iterator.hasNext())
                    {
                        element = iterator.next();
                        element.f3 = converged;
                        out.collect(element);

                    }                                                   
                }
            })              

            ;

DataStream<Tuple3<Integer, ArrayList<Integer>, Integer>> feed_back = updatedVertex
        .filter(new NotConvergedFilter())                               
        //Remove the finished convergence flag
        //Transforms the Tuples4 to Tuples3 so that it becomes compatible with beginning_loop
        .map(new RemoveConvergeceFlag())
        ;

beginning_loop.closeWith(feed_back);

//Selects the windows that have already converged
DataStream<?> convergedWindows = updatedVertex
        .filter(new ConvergedFilter() );

convergedWindows.print()
.setParallelism(1)
.name("Sink to stdout");

在执行结束时,convergedwindows不接收任何元组(因为算法不能仅用一次迭代收敛)。如果我打印开始的循环,我会看到第一次迭代的初始元组和反馈的元组。但是,没有别的了。
所以,总结一下我的问题,这可能是Flink的局限性吗?如果是这样的话,您知道在初始缩减之后更新顶点标签的不同方法吗,一种不基于连接的方法?
另外,我用的是flink 1.3.3

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题