首先让我问一下我的问题,然后你能澄清一下我对应用方法的假设吗?
问题:如果我的应用程序每一分钟创建150万条(大约)记录,而flink job使用15++不同的运算符从kafka消费者处读取这些记录,那么这个逻辑可能会产生延迟、背压等(你可以假设平行度是16)
public class Sample{
//op1 =
kafkaSource
.keyBy(something)
.timeWindow(Time.minutes(1))
.apply(new ApplySomething())
.name("Name")
.addSink(kafkaSink);
//op2 =
kafkaSource
.keyBy(something2)
.timeWindow(Time.seconds(1)) // let's assume that this one second
.apply(new ApplySomething2())
.name("Name")
.addSink(kafkaSink);
// ...
//op16 =
kafkaSource
.keyBy(something16)
.timeWindow(Time.minutes(1))
.apply(new ApplySomething16())
.name("Name")
.addSink(kafkaSink);
}
// ..
public class ApplySomething ... {
private AnyObject object;
private int threshold = 30, 40, 100 ...;
@Override
public void open(Configuration parameters) throws Exception{
object = new AnyObject();
}
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
int counter = 0;
for (Record each : input){
counter += each.getValue();
if (counter > threshold){
out.collec(each.getResult());
return;
}
}
}
}
如果是,我应该使用带状态的平面图(rocksdb)而不是时间窗口吗?
我的预测是“是的”。让我解释一下为什么我会这样想:
如果parallelism是16,则会有16个不同的indivudal示例 ApplySomething1(), ApplySomething2()...ApplySomething16()
还有十六个 AnyObject()
per的示例 ApplySomething..()
班级。
应用程序工作时,如果 keyBy(something)
分区号大于16(假设我的应用程序有1.000.000个不同的 something
每天),然后 ApplySomething..()
示例将处理不同的键,因此只有一个 apply()
应该在处理之前等待其他循环。那么这会造成延迟吗?
1条答案
按热度按时间ff29svar1#
flink的时间窗口与epoch对齐(例如,如果你有一堆小时窗口,它们都会在小时触发)。因此,如果您确实希望在您的作业中有一堆不同的窗口,那么您应该将它们配置为具有不同的偏移量,这样它们就不会同时被触发。这样做会分散负载。看起来像这样
(或使用)
TumblingEventTimeWindows
视情况而定)。这将创建一分钟长的窗口,每分钟后15秒触发。只要用例允许,就应该使用增量聚合(通过
reduce
或者aggregate
),而不是使用WindowFunction
(或ProcessWindowFunction
)它必须先收集分配给列表中每个窗口的所有事件,然后再将它们作为一种小型批处理进行处理。如果您已将rocksdb配置为状态后端,则键控时间窗口将在rocksdb中保持其状态。你不需要改用
RichFlatMap
才能进入rocksdb(此外,由于flatmap不能使用计时器,因此我假设您最终会使用process函数。)当窗口操作符的任何并行示例忙于执行其窗口函数时(其中一个
ApplySomethings
)你认为这个任务不会做任何其他事情是正确的——因此它会(除非它很快完成)产生暂时的反压力。您需要根据需要增加并行性,以便作业可以满足吞吐量和延迟的要求。