过滤器性能提示

3pvhb19x  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(286)

假设你在做一个大的flink项目。你也是 keyBy 客户的客户端ip地址。
意识到你要在不同的代码中过滤相同的东西,比如:

public void calculationOne(){
   kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
   kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processB).sink(...);
}

假设他们很多 kafkaSource.filter(isContainsSmthA).. 现在这个结构导致了flink中的性能问题?
如果我像下面这样做,会更好吗?

public Stream filteredA(){
   return kafkaSource.filter(isContainsSmthA);
public void calculationOne(){
   filteredA().keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
   filteredA().keyBy(clientip).process(processB).sink(...);
}
rkue9o1l

rkue9o1l1#

我可以像下面这样做,一个简单的 Package 器操作符可以通过两个不同的函数运行数据,并生成两个输出端。

SingleOutputStreamOperator comboResults = kafkaSource
    .filter(isContainsSmthA)
    .keyBy(clientip)
    .process(new MyWrapperFunction(processA, processB));

comboResults
    .getSideOutput(processATag)
    .sink(...);

comboResults
    .getSideOutput(processBTag)
    .sink(...);

虽然我不知道这和阿维德的建议有什么不同。

6qftjkof

6qftjkof2#

这在一定程度上取决于它在操作上的表现。
第一种方法是对kafka集群更友好:所有记录读取一次。过滤器本身是一个非常便宜的操作,所以你不需要太担心它。然而,这种方法的一大缺点是,如果一种计算速度比另一种慢得多,就会减慢它们的速度。如果您不处理历史事件,那么不管怎样,调整应用程序集群的大小以跟上所有事件的发展都不重要。当前的另一个不利因素是,如果你在 calculationTwo 还包括中的任务 calculationOne 重新启动。不过,社区正在积极努力缓解这一问题。
第二种方法只允许受影响的源->…->重新启动接收器子策略。因此,如果您希望频繁地重新启动或需要保证某些sla,那么这种方法更好。扩展实际上是为每一条管道提供单独的flink应用程序。您可以共享同一个jar,但在提交时使用不同的参数来选择正确的管道。这种方法还使应用程序的更新更加容易,因为您只会遇到实际修改的管道的停机时间。

相关问题