flink-co组外部连接失败,背压高

vlju58qv  于 2021-06-21  发布在  Flink
关注(0)|答案(3)|浏览(428)

我在Flink有两条小溪 stream1 每秒有70000条记录 stream2 可能有也可能没有数据。

// Ingest the High Frequency Analog Stream
SingleOutputStreamOperator<FlatHighFrequencyAnalog> stream1 =
    environment
        .addSource(createHFAConsumer())
        .name("hfa source");

SingleOutputStreamOperator<EVWindow> stream2 = environment
        .addSource(createHFDConsumer())
        .name("hfd source");

DataStream<Message> pStream =
        stream1
        .coGroup(stream2)
        .where(obj -> obj.getid())
        .equalTo(ev -> ev.getid())
            .window(TumblingEventTimeWindows.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
            .evictor(TimeEvictor.of(Time.minutes(Constants.VALIDTY_WINDOW_MIN)))
        .apply(new CalculateCoGroupFunction());

当两个流都有数据时,这工作得非常好,但是当stream2没有数据时,作业会失败,背压非常高。cpu利用率也会激增200%。
在这种情况下如何处理外部连接

evrscar2

evrscar21#

谢谢大卫·安德森的指点
区域协调机构:
当我试图在我的流周围创建一个翻滚窗口时,主要的问题出现了。
根据flink文件
简而言之,当第一个应该属于这个窗口的元素到达时,就会创建一个窗口,当时间(事件或处理时间)经过它的结束时间戳加上用户指定的允许延迟时,这个窗口就会被完全删除
因为没有传入的数据 stream2 Windows从来没有出现过。正如大卫所指出的
每当连接多个流时,得到的水印是传入水印的最小值
也就是说flink在缓冲 stream1 在等待的时候 stream2 最终会导致高背压和最终的oom。
解决方案:
我创建了一个外部脚本来向kafka流发送虚拟心跳消息 stream2 在我的应用程序中添加了忽略这些消息进行计算的逻辑。
这迫使 stream2 以及 stream1 为了推进水印和窗口被删除的上下文。

0yg35tkg

0yg35tkg2#

我认为问题在于,空闲流中缺少水印阻碍了整体水印。每当连接多个流时,得到的水印是传入水印的最小值。这会导致像你所经历的那样的问题。
你有几个选择:
为设置水印 stream2 成为 Watermark.MAX_WATERMARK ,从而 stream1 完全控制水印。
不知怎么发现的 stream2 是空闲的,并且在缺少事件的情况下人为地推进水印。下面是一个例子。

huus2vyu

huus2vyu3#

如前所述:

每当连接多个流时,得到的水印是传入水印的最小值

这意味着flink在等待stream2时缓冲了stream1的数据,最终会导致高背压和oom。
它对你有用 coGroup() 方法从 DataStream<T> 返回的类 CoGroupedStreams<T, T2> .
为了避免这种行为,我们可以使用 union(DataStream<T>... streams) 方法返回一个简单的 DataStream<T> 水印会像平常的溪流一样前进。
我们需要解决的唯一问题是两个流都有一个公共模式(类)。我们可以使用带有两个字段的聚合类:

public class Aggregator {

  private FlatHighFrequencyAnalog flatHighFrequencyAnalog;
  private EVWindow evWindow;

  public Aggregator(FlatHighFrequencyAnalog flatHighFrequencyAnalog) {
    this.flatHighFrequencyAnalog = flatHighFrequencyAnalog;
  }

  public Aggregator(EVWindow evWindow) {
    this.evWindow = evWindow;
  }

  public FlatHighFrequencyAnalog getFlatHighFrequencyAnalog() {
    return flatHighFrequencyAnalog;
  }

  public EVWindow getEVWindow() {
    return evWindow;
  }
}

另外,一种更通用的方法是 Either<L, R> 上课地点 org.apache.flink.types .

让我们总结一下最后的内容:

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream1 =
    environment
        .addSource(createHFAConsumer())
        .map(hfa -> Either.Left(hfa));

SingleOutputStreamOperator<Either<EVWindow, FlatHighFrequencyAnalog>> stream2 = 
    environment
        .addSource(createHFDConsumer())
        .map(hfd -> Either.Right(hfd));

DataStream<Message> pStream =
        stream1
          .union(stream2)
          .assignTimestampsAndWatermarks(
              WatermarkStrategy
                  .<Either<EVWindow, FlatHighFrequencyAnalog>>forBoundedOutOfOrderness(
                    ofSeconds(MAX_OUT_OF_ORDERNESS))
                .withTimestampAssigner((input, timestamp) -> input.isLeft() ? input.left().getTimeStamp() : input.right().getTimeStamp()))
          .keyBy(value -> value.isLeft() ? value.left().getId() : value.right().getId())
          .window(TumblingEventTimeWindows.of(Time.minutes(MINUTES)))
          .process(new ProcessWindowFunction());

在进程函数中获取不同的集合

List<EVWindow> evWindows =
        Streams.stream(elements)
            .filter(Either::isLeft)
            .map(Either::left)
            .collect(Collectors.toList());

List<FlatHighFrequencyAnalog> highFrequencyAnalogs =
        Streams.stream(elements)
            .filter(Either::isRight)
            .map(Either::right)
            .collect(Collectors.toList());

相关问题