java.lang.outofmemoryerror:ApacheFlink1.10中的元空间部署kubernetes集群

nfs0ujit  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(184)

我正在使用apache flink 1.10使用restful api使用rabbitmq消息并接收到mysql,现在taskmanager退出并抛出错误:

2020-07-02 05:36:01,464 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Timestamps/Watermarks (1/1) (c5607a434c6da19872cf5e0a8dda761a) switched from DEPLOYING to RUNNING.
2020-07-02 05:36:02,369 INFO  org.apache.flink.runtime.taskmanager.Task                     - Window(TumblingEventTimeWindows(5000), EventTimeTrigger, SumField, CollectionWindow) -> Sink: Unnamed (1/1) (6dfa060a1e92735ba60e28c7e5871a56) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: Metaspace
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:66)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1925)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2344)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2268)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2126)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)

这是我的apache flink jobs代码的一部分:

public static void pumpConsumeHandler() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkUtil.initEnv(env);
        RMQSource dataSource = FlinkUtil.initDatasource("goblin.pump.report.realtime");
        DataStream<String> dataStreamSource = env.addSource(dataSource);
        DataStream<ReportPump> record =
                dataStreamSource.map(new MapFunction<String, ReportPump>() {
                    @Override
                    public ReportPump map(String value) throws Exception {
                        ObjectMapper mapper = new ObjectMapper();
                        ReportPump pumpRecord = mapper.readValue(value, ReportPump.class);
                        return pumpRecord;
                    }
                }).assignTimestampsAndWatermarks(new CustomAssign<ReportPump>());

        record.keyBy(new KeySelector<ReportPump, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> getKey(ReportPump value) throws Exception {
                return Tuple2.of(value.getGameType(), value.getTenantId());
            }
        })
                .timeWindow(Time.seconds(5))
                .reduce(new SumField(), new CollectionWindow())
                .addSink(new SinkFunction<List<ReportPump>>() {
                    @Override
                    public void invoke(List<ReportPump> reportPumps, Context context) throws Exception {
                        persistCollectData(reportPumps);
                    }
                });
        env.execute(PumpRealtimeHandler.class.getName());
    }

    public static void persistCollectData(List<ReportPump> records) throws Exception {
        PumpConsumeService service = FlinkUtil.InitRetrofit().create(PumpConsumeService.class);
        Call<ResponseBody> call = service.savePumpRecords(records);
        ResponseBody body = call.execute().body();
        String str = new String(body.bytes(), "UTF-8");
        ObjectMapper mapper = new ObjectMapper();
        Response response = mapper.readValue(str, Response.class);
        if (!ServiceException.API_OK.equals(response.getResultCode())) {
            log.error("save error:" + str);
            throw new Exception();
        } else {
            log.debug("success:" + str);
        }
    }

为了解决这个问题,我调整了配置:

taskmanager.memory.jvm-metaspace.size: 256mb(from default 96M to 256M)
taskmanager.memory.process.size: 2g(from 1G to 2G)

我有两份flink工作,我应该怎么做才能避免这个问题?我应该配置多少元空间大小?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题