假设你在做一个大的flink项目。你也是 keyBy
客户的客户端ip地址。
意识到你要在不同的代码中过滤相同的东西,比如:
public void calculationOne(){
kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
kafkaSource.filter(isContainsSmthA).keyBy(clientip).process(processB).sink(...);
}
假设他们很多 kafkaSource.filter(isContainsSmthA)..
现在这个结构导致了flink中的性能问题?
如果我像下面这样做,会更好吗?
public Stream filteredA(){
return kafkaSource.filter(isContainsSmthA);
public void calculationOne(){
filteredA().keyBy(clientip).process(processA).sink(...);
}
public void calculationTwo(){
filteredA().keyBy(clientip).process(processB).sink(...);
}
2条答案
按热度按时间rkue9o1l1#
我可以像下面这样做,一个简单的 Package 器操作符可以通过两个不同的函数运行数据,并生成两个输出端。
虽然我不知道这和阿维德的建议有什么不同。
6qftjkof2#
这在一定程度上取决于它在操作上的表现。
第一种方法是对kafka集群更友好:所有记录读取一次。过滤器本身是一个非常便宜的操作,所以你不需要太担心它。然而,这种方法的一大缺点是,如果一种计算速度比另一种慢得多,就会减慢它们的速度。如果您不处理历史事件,那么不管怎样,调整应用程序集群的大小以跟上所有事件的发展都不重要。当前的另一个不利因素是,如果你在
calculationTwo
还包括中的任务calculationOne
重新启动。不过,社区正在积极努力缓解这一问题。第二种方法只允许受影响的源->…->重新启动接收器子策略。因此,如果您希望频繁地重新启动或需要保证某些sla,那么这种方法更好。扩展实际上是为每一条管道提供单独的flink应用程序。您可以共享同一个jar,但在提交时使用不同的参数来选择正确的管道。这种方法还使应用程序的更新更加容易,因为您只会遇到实际修改的管道的停机时间。