我基本上每一篇文章都读过,但我还是有记忆问题。我用气流来例示我在emr上的作业,直到两天前,所有作业都没有内存了。作业从s3存储桶读取,然后我进行一些聚合,然后将其保存回s3。我有两种安排的工作类型-每小时和每天-在这两种工作中,我给出了如下所示的不同设置:
public static Configuration getJobConfiguration(String interval) {
Configuration jobConfiguration = new Configuration();
jobConfiguration.set("job.name", "JobImport Job");
jobConfiguration.set("mr.mapreduce.framework.name", "yarn);
jobConfiguration.set("fs.defaultFS", "hdfs://xxxx:8020");
jobConfiguration.set("dfs.client.use.datanode.hostname", "true");
jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.policy", "ALWAYS");
jobConfiguration.set("dfs.client.block.write.replace-datanode-on-failure.best-effort", "true");
if (interval.equals("hourly")) {
jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "1000000");
jobConfiguration.set("mapreduce.reduce.memory.mb", "6144"); // max memory for reducer
jobConfiguration.set("mapreduce.map.memory.mb", "4098"); // max memory for mapper
jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx4098m"); // reducer java heap
jobConfiguration.set("mapreduce.map.java.opts", "-Xmx3075m"); // mapper java heap
}
else {
jobConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "20000000");
jobConfiguration.set("mapreduce.reduce.memory.mb", "8192"); // max memory for reducer
jobConfiguration.set("mapreduce.map.memory.mb", "6144"); // max memory for mapper
jobConfiguration.set("mapreduce.reduce.java.opts", "-Xmx6144m"); // reducer java heap
jobConfiguration.set("mapreduce.map.java.opts", "-Xmx4098m"); // mapper java heap
}
return jobConfiguration;
}
我的mapper和reduce如下所示:
private JsonParser parser = new JsonParser();
private Text apiKeyText = new Text();
private Text eventsText = new Text();
@Override
public void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] hourlyEvents = line.split("\n");
JsonElement elem;
JsonObject ev;
try {
for (String events : hourlyEvents) {
elem = this.parser.parse(events);
ev = elem.getAsJsonObject();
if(!ev.has("api_key") || !ev.has("events")) {
continue;
}
this.apiKeyText.set(ev.get("api_key").getAsString());
this.eventsText.set(ev.getAsJsonArray("events").toString());
context.write(this.apiKeyText, this.eventsText);
}
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// ------------------
// Separate class
// ------------------
private JsonParser parser = new JsonParser();
private Text events = new Text();
@Override
public void reduce(Text key, Iterable<Text> values, Context context) {
try {
JsonObject obj = new JsonObject();
JsonArray dailyEvents = new JsonArray();
for (Text eventsTmp : values) {
JsonArray tmp = this.parser.parse(eventsTmp.toString()).getAsJsonArray();
for (JsonElement ev: tmp) {
dailyEvents.add(ev);
}
}
obj.addProperty("api_key", key.toString());
obj.add("events", dailyEvents);
this.events.set(obj.toString());
context.write(NullWritable.get(), this.events);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
这是mapreduce作业之后的转储: INFO mapreduce.Job: Counters: 56 File System Counters FILE: Number of bytes read=40 FILE: Number of bytes written=69703431 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=3250 HDFS: Number of bytes written=0 HDFS: Number of read operations=58 HDFS: Number of large read operations=0 HDFS: Number of write operations=4 S3N: Number of bytes read=501370932 S3N: Number of bytes written=0 S3N: Number of read operations=0 S3N: Number of large read operations=0 S3N: Number of write operations=0 Job Counters Failed reduce tasks=4 Killed reduce tasks=1 Launched map tasks=26 Launched reduce tasks=6 Data-local map tasks=26 Total time spent by all maps in occupied slots (ms)=35841984 Total time spent by all reduces in occupied slots (ms)=93264640 Total time spent by all map tasks (ms)=186677 Total time spent by all reduce tasks (ms)=364315 Total vcore-milliseconds taken by all map tasks=186677 Total vcore-milliseconds taken by all reduce tasks=364315 Total megabyte-milliseconds taken by all map tasks=1146943488 Total megabyte-milliseconds taken by all reduce tasks=2984468480 Map-Reduce Framework Map input records=24 Map output records=24 Map output bytes=497227681 Map output materialized bytes=66055825 Input split bytes=3250 Combine input records=0 Combine output records=0 Reduce input groups=0 Reduce shuffle bytes=832 Reduce input records=0 Reduce output records=0 Spilled Records=24 Shuffled Maps =52 Failed Shuffles=0 Merged Map outputs=52 GC time elapsed (ms)=23254 CPU time spent (ms)=274180 Physical memory (bytes) snapshot=25311526912 Virtual memory (bytes) snapshot=183834742784 Total committed heap usage (bytes)=27816099840 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=501370932 File Output Format Counters Bytes Written=0
我使用的集群是 emr-5.2.0
有2个节点,每个节点都是一个m3.xlarge示例。在emr i用户中运行jar yarn jar ...
从步骤(每个步骤都从气流示例化)。
除了配置中的那些参数外,我不更改任何其他参数,因此我使用默认值。如何解决错误:java堆空间?
暂无答案!
目前还没有任何答案,快来回答吧!