flink并行相关对象示例&apply方法

ifmq2ha2  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(346)

首先让我问一下我的问题,然后你能澄清一下我对应用方法的假设吗?
问题:如果我的应用程序每一分钟创建150万条(大约)记录,而flink job使用15++不同的运算符从kafka消费者处读取这些记录,那么这个逻辑可能会产生延迟、背压等(你可以假设平行度是16)

public class Sample{
  //op1 = 
     kafkaSource
                .keyBy(something)
                .timeWindow(Time.minutes(1))
                .apply(new ApplySomething())
                .name("Name")
                          .addSink(kafkaSink);
  //op2 = 
    kafkaSource
                .keyBy(something2)
                .timeWindow(Time.seconds(1)) // let's assume that this one second
                .apply(new ApplySomething2())
                .name("Name")
                          .addSink(kafkaSink);
 // ...

  //op16 = 
    kafkaSource
                .keyBy(something16)
                .timeWindow(Time.minutes(1)) 
                .apply(new ApplySomething16())
                .name("Name")
                          .addSink(kafkaSink);

}
// ..
public class ApplySomething ... {
  private AnyObject object;
  private int threshold = 30, 40, 100 ...;

      @Override
    public void open(Configuration parameters) throws Exception{
        object = new AnyObject();
    }

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
        int counter = 0;
        for (Record each : input){
          counter += each.getValue();
          if (counter > threshold){
            out.collec(each.getResult());
            return;
          }
        }
    }
}

如果是,我应该使用带状态的平面图(rocksdb)而不是时间窗口吗?
我的预测是“是的”。让我解释一下为什么我会这样想:
如果parallelism是16,则会有16个不同的indivudal示例 ApplySomething1(), ApplySomething2()...ApplySomething16() 还有十六个 AnyObject() per的示例 ApplySomething..() 班级。
应用程序工作时,如果 keyBy(something) 分区号大于16(假设我的应用程序有1.000.000个不同的 something 每天),然后 ApplySomething..() 示例将处理不同的键,因此只有一个 apply() 应该在处理之前等待其他循环。那么这会造成延迟吗?

ff29svar

ff29svar1#

flink的时间窗口与epoch对齐(例如,如果你有一堆小时窗口,它们都会在小时触发)。因此,如果您确实希望在您的作业中有一堆不同的窗口,那么您应该将它们配置为具有不同的偏移量,这样它们就不会同时被触发。这样做会分散负载。看起来像这样

.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15))

(或使用) TumblingEventTimeWindows 视情况而定)。这将创建一分钟长的窗口,每分钟后15秒触发。
只要用例允许,就应该使用增量聚合(通过 reduce 或者 aggregate ),而不是使用 WindowFunction (或 ProcessWindowFunction )它必须先收集分配给列表中每个窗口的所有事件,然后再将它们作为一种小型批处理进行处理。
如果您已将rocksdb配置为状态后端,则键控时间窗口将在rocksdb中保持其状态。你不需要改用 RichFlatMap 才能进入rocksdb(此外,由于flatmap不能使用计时器,因此我假设您最终会使用process函数。)
当窗口操作符的任何并行示例忙于执行其窗口函数时(其中一个 ApplySomethings )你认为这个任务不会做任何其他事情是正确的——因此它会(除非它很快完成)产生暂时的反压力。您需要根据需要增加并行性,以便作业可以满足吞吐量和延迟的要求。

相关问题