flink流作业未按预期扩展

h6my8fg2  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(325)

我们正在测试flink的缩放能力。但我们发现,无论是增加更多的插槽还是增加任务管理器的数量,伸缩都不起作用。我们希望线性,如果不是接近线性缩放性能,但结果甚至显示退化。感谢您的评论。
测试细节,
-vmware vsphere
-只是一个简单的通过测试,

- auto gen source 3mil records, each 1kb in size, parallelism=1

- source pass into next map operator, which just return the same record, and sent counter to statsD, parallelism is in cases = 2,4,6

3个tm,共6个插槽(2个/tm)每个jm/tm有32个vcpu,100gb内存
结果:
2个插槽:26秒,3mil/26=115k tps
4个插槽:23秒,3mil/23=130k tps
6个插槽:22秒,3mil/22=136k tps
如图所示,缩放比例几乎为零。有什么线索吗?谢谢。

v1uwarro

v1uwarro1#

请参考样本代码,

public class passthru extends RichMapFunction<String, String> {
        public void open(Configuration configuration) throws Exception {
        ... ... 
            stats = new NonBlockingStatsDClient();
        }
        public String map(String value) throws Exception { 
            ... ...
            stats.increment(); 
            return value;
        }
    }

    public class datagen extends RichSourceFunction<String> {
        ... ...
        public void run(SourceContext<String> ctx) throws Exception {
            int i = 0;
            while (run){
                String idx = String.format("%09d", i);
                ctx.collect("{\"<a 1kb json content with idx in certain json field>\"}");
                i++;
                if(i == loop) 
                    run = false;
            }
        }
        ... ...
    }
    public class Job {
        public static void main(String[] args) throws Exception {
        ... ...
            DataStream<String> stream = env.addSource(new datagen(loop)).rebalance();
            DataStream<String> convert = stream.map(new passthru(statsdUrl));
            env.execute("Flink");
        } 
    }

还原状态代码,

dataStream.flatMap(xxx).keyBy(new KeySelector<xxx, AggregationKey>() {
        public AggregationKey getKey(rec r) throws Exception {
            ... ...             
           }
        }).process(new Aggr());

    public class Aggr extends ProcessFunction<rec, rec> {
        private ReducingState<rec> store;
        public void open(Configuration parameters) throws Exception {
            store= getRuntimeContext().getReducingState(new ReducingStateDescriptor<>(
                "reduction store", new ReduceFunction<rec>() {
            ... ...
        }
    public void processElement(rec r, Context ctx, Collector<rec> out)
        throws Exception {
            ... ...
            store.add(r);
yqkkidmi

yqkkidmi2#

你真的应该使用richparallelsourcefunction。如果您关心使源的不同示例的记录不同,那么可以从runtimecontext获取每个示例的索引,runtimecontext可通过richfunction接口中的getruntimecontext()方法获得。
另外,flink有一个内置的statsd metrics reporter,您应该使用它,而不是自己滚动。此外,numrecordsin、numrecordsout、numrecordsinpersecond和numrecordsoutpersecond已经在为您计算了,所以不需要自己创建这个工具。您还可以通过flink的web界面或RESTAPI访问这些度量。
至于为什么kafka消费者的可伸缩性很差,可能有很多原因。如果您使用的是事件时处理,那么空闲分区可能会阻碍您的工作(请参阅https://issues.apache.org/jira/browse/flink-5479). 如果流是键控的,那么数据倾斜可能是一个问题。如果要连接到外部数据库或服务,则很容易成为瓶颈。如果检查点配置错误,可能会导致这种情况。或网络容量不足。
我将通过查看FlinkWebUI中的一些关键指标来开始调试。负载在子任务之间是否平衡良好,或者是否有偏差?您可以打开延迟跟踪,并查看kafka分区中是否有一个分区行为不正常(通过检查sink处的延迟,每个分区都会报告延迟)。你可以寻找背压。

相关问题