我需要将一个整数立方体拆分为向量,对每个向量执行一些操作(比如简单的加法),然后将向量合并回一个立方体。向量操作应并行执行(即,每个流的向量)。多维数据集是包含id的对象。
我可以将多维数据集拆分为向量,使用多维数据集id创建一个元组,然后使用keyby(id),并为每个多维数据集的向量创建一个分区。然而,似乎我必须使用一个窗口的某个时间单位来做到这一点。这个应用程序对延迟非常敏感,所以我更喜欢在向量到达时组合它们,也许使用某种逻辑时钟(我知道一个多维数据集中有多少个向量),当最后一个向量到达时,将重新组装的多维数据集发送到下游。这在Flink可能吗?
下面是一段代码片段,举例说明了这个想法:
//Stream topology..
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Cube> stream = env
//Take cubes from collection and send downstream
.fromCollection(cubes)
//Split the cube(int[][][]) to vectors(int[]) and send downstream
.flatMap(new VSplitter()) //returns tuple with id at pos 1
.keyBy(1)
//For each value in each vector element, add its value with one.
.map(new MapFunction<Tuple2<CubeVector, Integer>, Tuple2<CubeVector, Integer>>() {
@Override
public Tuple2<CubeVector, Integer> map(Tuple2<CubeVector, Integer> cVec) throws Exception {
CubeVector cv = cVec.getField(0);
cv.cubeVectorAdd(1);
cVec.setField(cv, 0);
return cVec;
}
})
//**Merge vectors back to a cube**//
.
.
.
//The cube splitter to vectors..
public static class VSplitter implements FlatMapFunction<Cube, Tuple2<CubeVector, Integer>> {
@Override
public void flatMap(Cube cube, Collector<Tuple2<CubeVector, Integer>> out) throws Exception {
for (CubeVector cv : cubeVSplit(cube)) {
//out.assignTimestamp()
out.collect(new Tuple2<CubeVector, Integer>(cv, cube.getId()));
}
}
}
1条答案
按热度按时间sc4hvdpw1#
你需要一个
FlatMapFunction
它不断地附加CubeVectors
直到它看到足够的CubeVectors
重建Cube
. 下面的代码段应该可以做到这一点: