java—flink作业每1:45小时的cpu负载就会增加

zsohkypk  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(297)

在上一个问题(这里)中,我问了为什么我的flink 1.10.1作业会使cpu负载在运行一小时后开始增加,而不会下降,然后,我开始用visualvm、jprofiler和memoryanalyzer监视我的作业,我发现了一些错误,包括不必要的对象创建、hashmaps和其他一些已修复的问题。然后,内存消耗和cpu负载都很低,但是cpu负载几乎每过1:45小时就会出现一次类似于“丘陵”的现象。
我有与以前相同的执行图,但我也将把它放在这里:

DataStream<Event> from_source = rabbitConsumer
                .flatMap(new RabbitMQConsumer())
                .assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source 
                    .filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);

/*one-> stateful operator: ProcessWindowFunction*/
data_stream.map(new EventCount(x))
            .keyBy(k -> new Date(k.timestamp.getTime()).toString())
            .window(TumblingEventTimeWindows.of(Time.ninutes(30)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*two-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*three*/
keyed_stream.filter(new FilterFunction())
            .map(new MapClass())
            .addSink(new SinkFuncion());

/*four*/
pw_keyed_stream = data_stream
            .filter(new FilterFunction())
            .map(new MapClass())
            .keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
  pw_keyed_stream.addSink(new SinkFuncion());

/*five-> stateful operator: ProcessWindowFunction*/
pw_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
          .process(new MyProcessWindowFunction())
          .addSink(new SinkFuncion());

/*Six-> stateful operator with 4 ConcurrentHashMap into the state: RichFlatMapFunction*/
keyed_stream.flatmap(new FlatMapFunction())
            .addSink(new SinkFuncion());

/*seven-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
            .process(new MyProcessWindowFunction())
            .addSink(new SinkFuncion());

/*eight-> stateful operator: RichFlatMapFunction*/
data_stream.filter(new FilterFunction())
           .keyBy(k -> k.type.equals("something") ? k.one : k.two)
           .flatmap(new FlatMapFunction())
           .addSink(new SinkFuncion());

变化:
从jdk8升级到jdk9(与两个jdk相同)
从cms到g1收集器的变化(与两个gc相同)
见下图:

我预计当交通量下降时,负载也会下降,但是这些山也会留在那里,即使负载很低。
再次感谢。谨致问候!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题