Flink使用 Iterable
在某个操作员身上。我有一个问题:为什么用iterable代替list?是不是因为迭代对象可能不会立即放入内存?如果是的话,
当我需要把iterable中的所有元素都放到内存中时,我应该怎么做?
当我需要遍历两次时应该怎么做?
下面是详细的代码。在 apply
函数,我首先需要遍历 iterable
建立r树索引。然后我需要遍历的是再次查询。
public class AllIndexRangeJoinProcess extends RichWindowFunction<Tuple2<Boolean, Point>, List<Point>, Long, TimeWindow> {
private static final int RTREE_NODE_CAPACITY = 50;
private double distance;
private transient SpatialTreeIndex<Point> treeIndex;
public AllIndexRangeJoinProcess(double distance) {
this.distance = distance;
}
@Override
public void open(Configuration parameters) throws Exception {
treeIndex = new RTreeIndex<>(RTREE_NODE_CAPACITY);
}
@Override
public void apply(Long aLong,
TimeWindow timeWindow,
Iterable<Tuple2<Boolean, Point>> iterable,
Collector<List<Point>> collector) throws Exception {
// build index
List<Point> points = new ArrayList<>();
for (Tuple2<Boolean, Point> t : iterable) {
points.add(t.f1);
treeIndex.insert(t.f1);
}
// query
for (Point p : points) {
List<Point> bufferedPoints = new ArrayList<>();
bufferedPoints.add(p);
bufferedPoints.addAll(treeIndex.query(p.getBufferedEnvelope(distance)));
collector.collect(bufferedPoints);
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!