我在hadoop map reduce中编写了一个简单的hash连接程序。其思路如下:
使用hadoop框架提供的distributedcache将一个小表分发给每个Map器。大表分布在Map器上,拆分大小为64m。Map程序的设置代码创建一个hashmap,读取这个小表中的每一行。在Map程序代码中,在hashmap上搜索(get)每个键,如果该键存在于hashmap中,则将其写出。此时不需要减速器。这是我们使用的代码:
public class Map extends Mapper<LongWritable, Text, Text, Text> {
private HashMap<String, String> joinData = new HashMap<String, String>();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String textvalue = value.toString();
String[] tokens;
tokens = textvalue.split(",");
if (tokens.length == 2) {
String joinValue = joinData.get(tokens[0]);
if (null != joinValue) {
context.write(new Text(tokens[0]), new Text(tokens[1] + ","
+ joinValue));
}
}
}
public void setup(Context context) {
try {
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
if (null != cacheFiles && cacheFiles.length > 0) {
String line;
String[] tokens;
BufferedReader br = new BufferedReader(new FileReader(
cacheFiles[0].toString()));
try {
while ((line = br.readLine()) != null) {
tokens = line.split(",");
if (tokens.length == 2) {
joinData.put(tokens[0], tokens[1]);
}
}
System.exit(0);
} finally {
br.close();
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
在测试这个代码时,我们的小表是32m,大表是128m,一个主节点和两个从节点。
当我有一个256m的堆时,上述输入的代码失败。我在mapred-site.xml文件的mapred.child.java.opts中使用-xmx256m。当我把它增加到300m时,它的速度非常慢,512m达到了它的最大吞吐量。
我不明白我的Map器在哪里消耗了这么多内存。使用上面给出的输入和Map器代码,我不希望我的堆内存达到256m,但是它失败了,出现了java堆空间错误。
我会很感激,如果你能给一些深入了解为什么Map器是消耗这么多内存。
编辑:
13/03/11 09:37:33 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
13/03/11 09:37:33 INFO input.FileInputFormat: Total input paths to process : 1
13/03/11 09:37:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/03/11 09:37:33 WARN snappy.LoadSnappy: Snappy native library not loaded
13/03/11 09:37:34 INFO mapred.JobClient: Running job: job_201303110921_0004
13/03/11 09:37:35 INFO mapred.JobClient: map 0% reduce 0%
13/03/11 09:39:12 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000000_0, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:40:43 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_0, Status : FAILED
org.apache.hadoop.io.SecureIOUtils$AlreadyExistsException: File /usr/home/hadoop/hadoop-1.0.3/libexec/../logs/userlogs/job_201303110921_0004/attempt_201303110921_0004_m_000001_0/log.tmp already exists
at org.apache.hadoop.io.SecureIOUtils.insecureCreateForWrite(SecureIOUtils.java:130)
at org.apache.hadoop.io.SecureIOUtils.createForWrite(SecureIOUtils.java:157)
at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
at org.apache.hadoop.mapred.Child$4.run(Child.java:257)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:416)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
attempt_201303110921_0004_m_000001_0: Exception in thread "Thread for syncLogs" java.lang.OutOfMemoryError: Java heap space
attempt_201303110921_0004_m_000001_0: at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:76)
attempt_201303110921_0004_m_000001_0: at java.io.BufferedOutputStream.<init>(BufferedOutputStream.java:59)
attempt_201303110921_0004_m_000001_0: at org.apache.hadoop.mapred.TaskLog.writeToIndexFile(TaskLog.java:312)
attempt_201303110921_0004_m_000001_0: at org.apache.hadoop.mapred.TaskLog.syncLogs(TaskLog.java:385)
attempt_201303110921_0004_m_000001_0: at org.apache.hadoop.mapred.Child$3.run(Child.java:141)
attempt_201303110921_0004_m_000001_0: log4j:WARN No appenders could be found for logger (org.apache.hadoop.hdfs.DFSClient).
attempt_201303110921_0004_m_000001_0: log4j:WARN Please initialize the log4j system properly.
13/03/11 09:42:18 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_1, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:43:48 INFO mapred.JobClient: Task Id : attempt_201303110921_0004_m_000001_2, Status : FAILED
Error: GC overhead limit exceeded
13/03/11 09:45:09 INFO mapred.JobClient: Job complete: job_201303110921_0004
13/03/11 09:45:09 INFO mapred.JobClient: Counters: 7
13/03/11 09:45:09 INFO mapred.JobClient: Job Counters
13/03/11 09:45:09 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=468506
13/03/11 09:45:09 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
13/03/11 09:45:09 INFO mapred.JobClient: Launched map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient: Data-local map tasks=6
13/03/11 09:45:09 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
13/03/11 09:45:09 INFO mapred.JobClient: Failed map tasks=1
1条答案
按热度按时间eanckbw91#
很难确定内存消耗的去向,但这里有几点提示:
你在创造2
Text
输入的每一行的对象。你应该用2Text
一次初始化的对象Mapper
作为类变量,然后为每行调用text.set(...)
. 这是map/reduce模式的常用模式,可以节省相当多的内存开销。你应该考虑使用
SequenceFile
为您的输入设置格式,这将避免使用textValue.split
,则可以将此数据直接作为数组使用。我已经读过好几遍了,像这样进行字符串拆分可能会非常密集,所以如果内存确实有问题的话,应该尽量避免。你也可以考虑使用KeyValueTextInputFormat
例如,如果只关心键/值对。如果这还不够,我建议您查看这个链接,特别是第7部分,它提供了一个非常简单的方法来分析应用程序,并查看在哪里分配了什么。