我在Flink有两条小溪 stream1
每秒有70000条记录 stream2
可能有也可能没有数据。
// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
environment
.addSource(createHFAConsumer())
.name("hfa source");
SingleOutputStreamOperator<EVWindow> stream2 = environment
.addSource(createHFDConsumer())
.name("hfd source");
DataStream<Message> pStream =
stream1
.coGroup(stream2)
.where(obj -> obj.getid())
.equalTo(ev -> ev.getid())
.window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
.apply(new CalculateCoGroupFunction());
当两个流都有数据时,这工作得非常好,但是当stream2没有数据时,作业会失败,背压非常高。cpu利用率也会激增200%。
在这种情况下如何处理外部连接
3条答案
按热度按时间evrscar21#
谢谢大卫·安德森的指点
区域协调机构:
当我试图在我的流周围创建一个翻滚窗口时,主要的问题出现了。
根据flink文件
简而言之,当第一个应该属于这个窗口的元素到达时,就会创建一个窗口,当时间(事件或处理时间)经过它的结束时间戳加上用户指定的允许延迟时,这个窗口就会被完全删除
因为没有传入的数据
stream2
Windows从来没有出现过。正如大卫所指出的每当连接多个流时,得到的水印是传入水印的最小值
也就是说flink在缓冲
stream1
在等待的时候stream2
最终会导致高背压和最终的oom。解决方案:
我创建了一个外部脚本来向kafka流发送虚拟心跳消息
stream2
在我的应用程序中添加了忽略这些消息进行计算的逻辑。这迫使
stream2
以及stream1
为了推进水印和窗口被删除的上下文。0yg35tkg2#
我认为问题在于,空闲流中缺少水印阻碍了整体水印。每当连接多个流时,得到的水印是传入水印的最小值。这会导致像你所经历的那样的问题。
你有几个选择:
为设置水印
stream2
成为Watermark.MAX_WATERMARK
,从而stream1
完全控制水印。不知怎么发现的
stream2
是空闲的,并且在缺少事件的情况下人为地推进水印。下面是一个例子。huus2vyu3#
如前所述:
每当连接多个流时,得到的水印是传入水印的最小值
和
这意味着flink在等待stream2时缓冲了stream1的数据,最终会导致高背压和oom。
它对你有用
coGroup()
方法从DataStream<T>
返回的类CoGroupedStreams<T, T2>
.为了避免这种行为,我们可以使用
union(DataStream<T>... streams)
方法返回一个简单的DataStream<T>
水印会像平常的溪流一样前进。我们需要解决的唯一问题是两个流都有一个公共模式(类)。我们可以使用带有两个字段的聚合类:
另外,一种更通用的方法是
Either<L, R>
上课地点org.apache.flink.types
.让我们总结一下最后的内容:
在进程函数中获取不同的集合