我正在使用flink的datastreamapi实现connected components算法,因为还没有使用这个api的实现。
对于这个算法,我通过滚动窗口来分离数据。所以,对于每个窗口,我尝试独立地计算算法。
我的问题来自算法的迭代特性。我实现了交互所需的数据管道(step数据管道),它由flatmaps、1个join、1个processwindow和1个filter组成。然而,似乎我想要反馈给循环的流实际上并没有反馈给循环的开头,因为算法不会迭代。我怀疑,如果原始迭代数据流与另一个流连接(即使后者是由前者的flatmap发起的),那么就不可能这样做。
我使用的代码如下:
//neigborsList = Datastream of <Vertex, [List of neighbors], label>
IterativeStream< Tuple3<Integer, ArrayList<Integer>, Integer> > beginning_loop = neigborsList.iterate(maxTimeout);
//Emits tuples Vertices and Labels for every vertex and its neighbors
DataStream<Tuple2<Integer,Integer> > labels = beginning_loop
//Datastream of <Vertex, label> for every neigborsList.f0 and element in neigborsList.f1
.flatMap( new EmitVertexLabel() )
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.minBy(1)
;
DataStream<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> updatedVertex = beginning_loop
//Update vertex label with the results from the labels reduction
.join(labels)
.where("vertex")
.equalTo("vertex")
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.apply(new JoinFunction<Tuple3<Integer,ArrayList<Integer>,Integer>, Tuple2<Integer,Integer>, Tuple4<Integer,ArrayList<Integer>,Integer,Integer>>() {
@Override
public Tuple4<Integer,ArrayList<Integer>,Integer,Integer> join(
Tuple3<Integer, ArrayList<Integer>, Integer> arg0, Tuple2<Integer, Integer> arg1)
throws Exception {
int hasConverged = 1;
if(arg1.f1.intValue() < arg0.f2.intValue() )
{
arg0.f2 = arg1.f1;
hasConverged=0;
}
return new Tuple4<>(arg0.f0,arg0.f1,arg0.f2,new Integer(hasConverged));
}
})
//Disseminates the convergence flag if a change was made in the window
.windowAll(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
.process(new ProcessAllWindowFunction<Tuple4<Integer,ArrayList<Integer>,Integer,Integer>,Tuple4<Integer, ArrayList<Integer>, Integer, Integer>,TimeWindow >() {
@Override
public void process(
ProcessAllWindowFunction<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, Tuple4<Integer, ArrayList<Integer>, Integer, Integer>, TimeWindow>.Context ctx,
Iterable<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> values,
Collector<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> out) throws Exception {
Iterator<Tuple4<Integer, ArrayList<Integer>, Integer, Integer>> iterator = values.iterator();
Tuple4<Integer, ArrayList<Integer>, Integer, Integer> element;
int hasConverged= 1;
while(iterator.hasNext())
{
element = iterator.next();
if(element.f3.intValue()>0)
{
hasConverged=0;
break;
}
}
//Re iterate and emit the values on the correct output
iterator = values.iterator();
Integer converged = new Integer(hasConverged);
while(iterator.hasNext())
{
element = iterator.next();
element.f3 = converged;
out.collect(element);
}
}
})
;
DataStream<Tuple3<Integer, ArrayList<Integer>, Integer>> feed_back = updatedVertex
.filter(new NotConvergedFilter())
//Remove the finished convergence flag
//Transforms the Tuples4 to Tuples3 so that it becomes compatible with beginning_loop
.map(new RemoveConvergeceFlag())
;
beginning_loop.closeWith(feed_back);
//Selects the windows that have already converged
DataStream<?> convergedWindows = updatedVertex
.filter(new ConvergedFilter() );
convergedWindows.print()
.setParallelism(1)
.name("Sink to stdout");
在执行结束时,convergedwindows不接收任何元组(因为算法不能仅用一次迭代收敛)。如果我打印开始的循环,我会看到第一次迭代的初始元组和反馈的元组。但是,没有别的了。
所以,总结一下我的问题,这可能是Flink的局限性吗?如果是这样的话,您知道在初始缩减之后更新顶点标签的不同方法吗,一种不基于连接的方法?
另外,我用的是flink 1.3.3
暂无答案!
目前还没有任何答案,快来回答吧!