emr mapreduce错误:java堆空间

zlwx9yxi  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(217)

我基本上每一篇文章都读过,但我还是有记忆问题。我用气流来例示我在emr上的作业,直到两天前,所有作业都没有内存了。作业从s3存储桶读取,然后我进行一些聚合,然后将其保存回s3。我有两种安排的工作类型-每小时和每天-在这两种工作中,我给出了如下所示的不同设置:

public static Configuration getJobConfiguration(String interval) {

    Configuration jobConfiguration = new Configuration();

    jobConfiguration.set("job.name", "JobImport Job");

    jobConfiguration.set("mr.mapreduce.framework.name", "yarn);

    jobConfiguration.set("fs.defaultFS", "hdfs://xxxx:8020");
    jobConfiguration.set("dfs.client.use.datanode.hostname", "true");
    jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
    jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.best-effort", "true");

    if (interval.equals("hourly")) {
        jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "1000000");
        jobConfiguration.set("mapreduce.reduce.memory.mb", "6144"); // max memory for reducer
        jobConfiguration.set("mapreduce.map.memory.mb", "4098");    // max memory for mapper
        jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx4098m"); // reducer java heap
        jobConfiguration.set("mapreduce.map.java.opts", "-Xmx3075m");    // mapper java heap
    }
    else {
        jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "20000000");
        jobConfiguration.set("mapreduce.reduce.memory.mb", "8192"); // max memory for reducer
        jobConfiguration.set("mapreduce.map.memory.mb", "6144");    // max memory for mapper
        jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx6144m"); // reducer java heap
        jobConfiguration.set("mapreduce.map.java.opts", "-Xmx4098m");    // mapper java heap
    }

    return jobConfiguration;
}

我的mapper和reduce如下所示:

private JsonParser parser = new JsonParser();
private Text apiKeyText = new Text();
private Text eventsText = new Text();

@Override
public void map(LongWritable key, Text value, Context context) {

    String line = value.toString();
    String[] hourlyEvents = line.split("\n");

    JsonElement elem;
    JsonObject ev;

    try {
        for (String events : hourlyEvents) {

            elem = this.parser.parse(events);
            ev = elem.getAsJsonObject();

            if(!ev.has("api_key") || !ev.has("events")) {
                continue;
            }

            this.apiKeyText.set(ev.get("api_key").getAsString());
            this.eventsText.set(ev.getAsJsonArray("events").toString());

            context.write(this.apiKeyText, this.eventsText);
        }
    } catch (IOException | InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}

// ------------------
// Separate class
// ------------------

private JsonParser parser = new JsonParser();
private Text events = new Text();

@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
    try {

        JsonObject obj = new JsonObject();
        JsonArray dailyEvents = new JsonArray();

        for (Text eventsTmp : values) {
            JsonArray tmp = this.parser.parse(eventsTmp.toString()).getAsJsonArray();
            for (JsonElement ev: tmp) {
                dailyEvents.add(ev);
            }
        }

        obj.addProperty("api_key", key.toString());
        obj.add("events", dailyEvents);

        this.events.set(obj.toString());

        context.write(NullWritable.get(), this.events);
    } catch (IOException | InterruptedException e) {
        logger.error(e.getMessage(), e);
    }
}

这是mapreduce作业之后的转储: INFO mapreduce.Job: Counters: 56 File System Counters FILE: Number of bytes read=40 FILE: Number of bytes written=69703431 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3250 HDFS: Number of bytes written=0 HDFS: Number of read operations=58 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 S3N: Number of bytes read=501370932 S3N: Number of bytes written=0 S3N: Number of read operations=0 S3N: Number of large read operations=0 S3N: Number of write operations=0 Job Counters Failed reduce tasks=4 Killed reduce tasks=1 Launched map tasks=26 Launched reduce tasks=6 Data-local map tasks=26 Total time spent by all maps in occupied slots (ms)=35841984 Total time spent by all reduces in occupied slots (ms)=93264640 Total time spent by all map tasks (ms)=186677 Total time spent by all reduce tasks (ms)=364315 Total vcore-milliseconds taken by all map tasks=186677 Total vcore-milliseconds taken by all reduce tasks=364315 Total megabyte-milliseconds taken by all map tasks=1146943488 Total megabyte-milliseconds taken by all reduce tasks=2984468480 Map-Reduce Framework Map input records=24 Map output records=24 Map output bytes=497227681 Map output materialized bytes=66055825 Input split bytes=3250 Combine input records=0 Combine output records=0 Reduce input groups=0 Reduce shuffle bytes=832 Reduce input records=0 Reduce output records=0 Spilled Records=24 Shuffled Maps =52 Failed Shuffles=0 Merged Map outputs=52 GC time elapsed (ms)=23254 CPU time spent (ms)=274180 Physical memory (bytes) snapshot=25311526912 Virtual memory (bytes) snapshot=183834742784 Total committed heap usage (bytes)=27816099840 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=501370932 File Output Format Counters Bytes Written=0 我使用的集群是 emr-5.2.0 有2个节点,每个节点都是一个m3.xlarge示例。在emr i用户中运行jar yarn jar ... 从步骤(每个步骤都从气流示例化)。
除了配置中的那些参数外,我不更改任何其他参数,因此我使用默认值。如何解决错误:java堆空间?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题