我正在使用apache flink 1.10使用restful api使用rabbitmq消息并接收到mysql,现在taskmanager退出并抛出错误:
2020-07-02 05:36:01,464 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Map -> Timestamps/Watermarks (1/1) (c5607a434c6da19872cf5e0a8dda761a) switched from DEPLOYING to RUNNING.
2020-07-02 05:36:02,369 INFO org.apache.flink.runtime.taskmanager.Task - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumField, CollectionWindow) -> Sink: Unnamed (1/1) (6dfa060a1e92735ba60e28c7e5871a56) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: Metaspace
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1925)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
这是我的apache flink jobs代码的一部分:
public static void pumpConsumeHandler() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkUtil.initEnv(env);
RMQSource dataSource = FlinkUtil.initDatasource("goblin.pump.report.realtime");
DataStream<String> dataStreamSource = env.addSource(dataSource);
DataStream<ReportPump> record =
dataStreamSource.map(new MapFunction<String, ReportPump>() {
@Override
public ReportPump map(String value) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReportPump pumpRecord = mapper.readValue(value, ReportPump.class);
return pumpRecord;
}
}).assignTimestampsAndWatermarks(new CustomAssign<ReportPump>());
record.keyBy(new KeySelector<ReportPump, Tuple2<Integer, Long>>() {
@Override
public Tuple2<Integer, Long> getKey(ReportPump value) throws Exception {
return Tuple2.of(value.getGameType(), value.getTenantId());
}
})
.timeWindow(Time.seconds(5))
.reduce(new SumField(), new CollectionWindow())
.addSink(new SinkFunction<List<ReportPump>>() {
@Override
public void invoke(List<ReportPump> reportPumps, Context context) throws Exception {
persistCollectData(reportPumps);
}
});
env.execute(PumpRealtimeHandler.class.getName());
}
public static void persistCollectData(List<ReportPump> records) throws Exception {
PumpConsumeService service = FlinkUtil.InitRetrofit().create(PumpConsumeService.class);
Call<ResponseBody> call = service.savePumpRecords(records);
ResponseBody body = call.execute().body();
String str = new String(body.bytes(), "UTF-8");
ObjectMapper mapper = new ObjectMapper();
Response response = mapper.readValue(str, Response.class);
if (!ServiceException.API_OK.equals(response.getResultCode())) {
log.error("save error:" + str);
throw new Exception();
} else {
log.debug("success:" + str);
}
}
为了解决这个问题,我调整了配置:
taskmanager.memory.jvm-metaspace.size: 256mb(from default 96M to 256M)
taskmanager.memory.process.size: 2g(from 1G to 2G)
我有两份flink工作,我应该怎么做才能避免这个问题?我应该配置多少元空间大小?
暂无答案!
目前还没有任何答案,快来回答吧!