在上一个问题(这里)中,我问了为什么我的flink 1.10.1作业会使cpu负载在运行一小时后开始增加,而不会下降,然后,我开始用visualvm、jprofiler和memoryanalyzer监视我的作业,我发现了一些错误,包括不必要的对象创建、hashmaps和其他一些已修复的问题。然后,内存消耗和cpu负载都很低,但是cpu负载几乎每过1:45小时就会出现一次类似于“丘陵”的现象。
我有与以前相同的执行图,但我也将把它放在这里:
DataStream<Event> from_source = rabbitConsumer
.flatMap(new RabbitMQConsumer())
.assignTimestampsAndWatermarks(new PeriodicExtractor());
SingleOutputStreamOperator<Event> data_stream = from_source
.filter(new NullidsFilterFunction())
KeyedStream<String, Event> keyed_stream = data_stream.keyby(k->k.id);
/*one-> stateful operator: ProcessWindowFunction*/
data_stream.map(new EventCount(x))
.keyBy(k -> new Date(k.timestamp.getTime()).toString())
.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*two-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*three*/
keyed_stream.filter(new FilterFunction())
.map(new MapClass())
.addSink(new SinkFuncion());
/*four*/
pw_keyed_stream = data_stream
.filter(new FilterFunction())
.map(new MapClass())
.keyBy(k -> k.id+ new Date(k.timestamp.getTime()));
pw_keyed_stream.addSink(new SinkFuncion());
/*five-> stateful operator: ProcessWindowFunction*/
pw_keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(30)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*Six-> stateful operator with 4 ConcurrentHashMap into the state: RichFlatMapFunction*/
keyed_stream.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
/*seven-> stateful operator: ProcessWindowFunction*/
keyed_stream.window(TumblingEventTimeWindows.of(Time.ninutes(10)))
.process(new MyProcessWindowFunction())
.addSink(new SinkFuncion());
/*eight-> stateful operator: RichFlatMapFunction*/
data_stream.filter(new FilterFunction())
.keyBy(k -> k.type.equals("something") ? k.one : k.two)
.flatmap(new FlatMapFunction())
.addSink(new SinkFuncion());
变化:
从jdk8升级到jdk9(与两个jdk相同)
从cms到g1收集器的变化(与两个gc相同)
见下图:
我预计当交通量下降时,负载也会下降,但是这些山也会留在那里,即使负载很低。
再次感谢。谨致问候!
暂无答案!
目前还没有任何答案,快来回答吧!