在flink中,如何多次遍历iterable对象

krcsximq  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(489)

Flink使用 Iterable 在某个操作员身上。我有一个问题:为什么用iterable代替list?是不是因为迭代对象可能不会立即放入内存?如果是的话,
当我需要把iterable中的所有元素都放到内存中时,我应该怎么做?
当我需要遍历两次时应该怎么做?
下面是详细的代码。在 apply 函数,我首先需要遍历 iterable 建立r树索引。然后我需要遍历的是再次查询。

public class AllIndexRangeJoinProcess extends RichWindowFunction<Tuple2<Boolean, Point>, List<Point>, Long, TimeWindow> {

  private static final int RTREE_NODE_CAPACITY = 50;

  private double distance;
  private transient SpatialTreeIndex<Point> treeIndex;

  public AllIndexRangeJoinProcess(double distance) {
    this.distance = distance;
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    treeIndex = new RTreeIndex<>(RTREE_NODE_CAPACITY);
  }

  @Override
  public void apply(Long aLong,
                    TimeWindow timeWindow,
                    Iterable<Tuple2<Boolean, Point>> iterable,
                    Collector<List<Point>> collector) throws Exception {
    // build index
    List<Point> points = new ArrayList<>();
    for (Tuple2<Boolean, Point> t : iterable) {
      points.add(t.f1);
      treeIndex.insert(t.f1);
    }
    // query
    for (Point p : points) {
      List<Point> bufferedPoints = new ArrayList<>();
      bufferedPoints.add(p);
      bufferedPoints.addAll(treeIndex.query(p.getBufferedEnvelope(distance)));
      collector.collect(bufferedPoints);
    }
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题