hadoop的mapreduce中的java详细数据流?

rqcrx0a6  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(289)

我正在努力理解mapreduce中的数据流。最近,当我的磁盘在reduce阶段耗尽内存时,一个要求很高的作业崩溃了。我发现很难估计我的工作需要多少磁盘。我将详细描述数据流。
如果有人能在mapreduce中更正、详细说明数据流或对我的系统的尺寸标注给出建议,这将是非常有帮助的。

群集配置:

我有一个包含30个奴隶的集群
12 gb内存
100 gb硬盘
4芯
我的Map任务与wordcount非常相似,所以它们需要很少的内存。我的reduce任务处理单词的排列组。由于需要加入相同的字,reduce函数需要一个始终<=3gb的临时哈希Map。
因为我有12gb的ram,我的hadoop守护进程需要1gb的堆+500mb的操作系统,所以我将map/reduce插槽划分如下:
4个带900mb堆的Map插槽和2个带3gb堆的reduce插槽。由于Map槽不需要超过300mb的内存,我已经设置了 io.sort.mb 到500 mb以改进Map阶段的内存排序。
我的作业有1800个map任务,每个任务生成8gb的map输出。因为我使用bzip2进行压缩,所以可以压缩到1GB。这意味着总的Map输出将低于2 tb,而我有3 tb的内存。
我选择了100个reduce任务,每个任务产生5gb的输出。
乍一看,一切都应该在记忆中。但显然,排序阶段需要压缩和解压缩,而复制阶段要求数据同时位于两个位置(我假设)。所以这里是它变得棘手的地方,这就是为什么我想完全理解数据流。我认为是这样的,但如果我错了,请纠正我:

数据流

Map任务生成大量溢出(在我的例子中是200个),这些溢出在内存中排序,然后在写入本地磁盘之前进行压缩。一旦Map任务完成,这给我200溢出文件,每10个合并( io.sort.factor ). 这意味着要解压缩10个文件:10x(5mb->40mb),因此这会产生0.4gb的压缩/解压缩开销。虽然我不确定200次泄漏发生第一轮合并后会发生什么。我想每个reduce任务都会先洗牌?所以文件的大小不会增加很多。如果我们从黑盒的Angular 来看这个问题,这意味着我们从200个压缩溢出开始,最后得到100个reduce任务的压缩文件(每个任务1个)。
因为我只有60个缩减器,现在每个节点60个压缩文件被复制到缩减器,这已经在Map阶段完成了。这可能意味着压缩文件暂时存在于源和目标上。这意味着在这种情况下,内存需求上升(暂时)160压缩文件每个节点是1.6倍的Map输出。Map输出是1800GB,所以我们最终得到2880GB,尽管是暂时的。所以第一阶段应该可以开始了。复制之后(我希望!)数据将从mappers本地输出目录中删除,因此我们拥有与map output相同的数据量,即1800gb。
现在,减速器中的排序阶段开始。我希望它不会在Map绘制者的记忆被清除之前启动?!由于要合并1800个map任务的输出,必须对其进行解压缩。reduce任务的输入大约是mapoutput/100=18gb的压缩数据。现在是如何解压的,它不能一次全部解压,因为我当时每个节点有144gb,而且因为我的工作没有崩溃,所以解压的执行稍微聪明一点。我的想法与map阶段相同:10个文件(1800个任务输出)同时被解压和合并。然后,解压缩将产生18gb/180=100mb的开销。问题是最后一轮合并是如何进行的,我记得在hadoop参考资料中读到,在只剩下一个文件之前,reducer不会一直合并。
在reduce阶段进行排序之后,reduce阶段将运行,这需要对输入记录进行解压缩,但由于每个reduce任务都使用500个输入键组,因此这应该不是一个真正的问题。
如前所述,reduce任务向dfs生成大约5gb的输出(总共0.5tb)。
前60项任务完成后,这项工作真的遇到麻烦了。在第二轮中,任务在排序阶段开始崩溃,这使我认为这与复制开销或解压缩开销有关。
确切的例外是: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3 我希望我能详细解释我的程序流程以及我对mapreduce的理解。如果:
有人可以清除有关复制阶段和合并阶段的烟雾
以及提供克服工作崩溃的建议。
对我来说,能够准确地估计我需要多少内存是非常理想的,因为如果我尝试一个有40个节点的集群在运行5天后发生崩溃(这一次的经验是这样的),这将是不愉快的,因为截止日期越来越近了。

提前谢谢

我的工作失败的stacktrace如下:

例外情况1:

org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for attempt_201310160819_0001_r_000068_1/intermediate.3
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:510)
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:142)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.createKVIterator(ReduceTask.java:2539)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier.access$400(ReduceTask.java:661)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:399)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

例外2:

FAILEDjava.io.IOException: Task: attempt_201310160819_0001_r_000075_1 - The reduce copier failed
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:390)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:416)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for output/map_1622.out
    at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
    at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
    at org.apache.hadoop.mapred.MapOutputFile.getInputFileForWrite(MapOutputFile.java:176)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2798)
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2762)

异常3:(可能是由diskchecker异常引起的)

Task attempt_201310160819_0001_r_000077_1 failed to report status for 2400 seconds. Killing!
mnemlml8

mnemlml81#

我刚收到praveen sripati发来的一封电子邮件,提到hadoop参考,我将把它粘贴到这里:
在复制阶段,数据是否同时存在于map和reduce任务中?何时清除Map输出?
下面是hadoop的权威指南
主机不会在第一个reducer检索到map输出后立即从磁盘中删除它们,因为reducer随后可能会失败。相反,他们会等到jobtracker(或applicationmaster)告诉他们删除它们,这是在作业完成之后。
这是非常重要的,Map输出仍然在磁盘上!!对我来说有点不幸。
5) 然后减速器开始合并。不完全确定是怎么做到的。是否每个reduce键合并到一个文件?还是为了一个任务而合并了所有的东西?
又是同一本书
复制完所有map输出后,reduce任务将进入排序阶段(由于排序是在map端执行的,因此应该正确地称为合并阶段),这将合并map输出,并保持它们的排序顺序。这是分两轮进行的。例如,如果有50个map输出,合并因子是10(默认值,由io.sort.factor属性控制,就像在map的merge中一样),那么将有5轮。每轮将10个文件合并为一个,因此最后将有5个中间文件。
谢谢,普拉文
这意味着合并后的文件数仅限于io.sort.factor。在我的例子中有10个段,每个段1.8gb。在上次合并期间,所有内容都必须解压缩,因此每轮需要1.8*10 gb=18 gb。

相关问题