来自Flink的官方文件:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-加入
示例代码为:
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...
orangeStream
.keyBy(elem => /* select key */)
.intervalJoin(greenStream.keyBy(elem => /* select key */))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process(new ProcessJoinFunction[Integer, Integer, String] {
override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
out.collect(left + "," + right);
}
});
});
从上面的代码中,我想知道如何指定开始时间(例如,从今天开始)来执行这个间隔连接(开始时间之前的数据将不被考虑)。
例如,我已经运行了3天的程序,我不想在3天内对所有数据执行这个连接,我只想对今天生成的数据执行连接。
1条答案
按热度按时间w51jfk4q1#
我觉得它不像你想的那样管用。
实际间隔是基于
orangeStream
在本例中,您并不是真正提供要考虑的数据的间隔,而是类似于窗口,它指定哪些元素将与橙色流的给定元素连接。因此,对于上面描述的窗口,如果您有带有timestamp的橙色元素
5
,则它将与具有来自的时间戳的元素连接3
至6
.我真的不认为你可以用它来执行连接只与一些数据的一部分,我唯一能想到的是简单地过滤数据使用时间戳和过滤掉所有的元素,已经生成较早。