flink中元素索引的自然方法

sycxhyv7  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(408)

是否有一种内置的方法来索引和访问数据流/数据集集合的各个元素的索引?
就像在典型的java集合中一样,您知道arraylist的第三个元素可以通过 ArrayList.get(2) 反之亦然 ArrayList.indexOf(elem) 给出指定元素(第一次出现)的索引(我不是要从流中提取元素。)
更具体地说,当 join 对于数据流/数据集,是否有一种“自然的”/简单的方法来连接第一个、第二个等元素。?
我知道有一个 zipWithIndex 为元素指定顺序索引的变换。我怀疑索引总是以0开头?但我也怀疑它们不一定是按照元素创建的顺序(即按事件时间)分配的(它也只存在于数据集。)这是我目前尝试的:

DataSet<Tuple2<Long, Double>> tempsJoIndexed = DataSetUtils.zipWithIndex(tempsJo);
DataSet<Tuple2<Long, Double>> predsLinJoIndexed = DataSetUtils.zipWithIndex(predsLinJo);
DataSet<Tuple3<Double, Double, Double>> joinedTempsJo = tempsJoIndexed
                .join(predsLinJoIndexed).where(0).equalTo(0)...

它似乎产生了错误的配对。
我看到了一些可能的方法,但它们要么不灵活,要么不是很好:
当然,我可以在创建流时为每个元素分配一个索引,例如,一个 Tuple s。
使用事件时间戳(我怀疑没有一种按时间戳设置密钥的方法,即使有,也不会对连接这样的多个流有用,除非时间戳实际上被指定为索引。)
我们可以试试” collect 先把“溪流”弄出来,然后我们就不用Flink了。
第一个。这种方法似乎是最可行的方法,但也似乎是多余的,因为根据定义,流应该是一个连续的集合,因此,元素应该有一种有序感(例如,“我是第36个元素,因为在我之前已经有35个元素了。”。

vhipe2zx

vhipe2zx1#

我认为您必须将索引值分配给元素,这样您就可以用这个索引对数据集进行分区,从而确保需要连接的两个记录被同一个子任务处理。一旦你做到了,一个简单的 groupBy(index) 以及 reduce() 会有用的。
但是,如果您想以并行度>1的方式读取源数据,那么在没有间隙的情况下分配越来越多的id并不是一件小事。如果那样的话,我会创建一个 RichMapFunction 它使用runtimecontext子任务id和子任务数来计算非重叠和单调索引。

相关问题