不明白加入Flink

5kgi1eie  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(444)

来自Flink的官方文件:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-加入
示例代码为:

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

从上面的代码中,我想知道如何指定开始时间(例如,从今天开始)来执行这个间隔连接(开始时间之前的数据将不被考虑)。
例如,我已经运行了3天的程序,我不想在3天内对所有数据执行这个连接,我只想对今天生成的数据执行连接。

w51jfk4q

w51jfk4q1#

我觉得它不像你想的那样管用。
实际间隔是基于 orangeStream 在本例中,您并不是真正提供要考虑的数据的间隔,而是类似于窗口,它指定哪些元素将与橙色流的给定元素连接。
因此,对于上面描述的窗口,如果您有带有timestamp的橙色元素 5 ,则它将与具有来自的时间戳的元素连接 36 .
我真的不认为你可以用它来执行连接只与一些数据的一部分,我唯一能想到的是简单地过滤数据使用时间戳和过滤掉所有的元素,已经生成较早。

相关问题