hadoop进程记录是如何跨越块边界分割的?

vd8tlhqk  于 2021-06-04  发布在  Hadoop
关注(0)|答案(6)|浏览(241)

根据 Hadoop - The Definitive Guide fileinputformats定义的逻辑记录通常不能整齐地放入hdfs块中。例如,textinputformat的逻辑记录是行,这通常会跨越hdfs边界。这与程序的功能无关,例如,程序行没有丢失或中断,但值得了解,因为这确实意味着数据本地Map(即,与输入数据在同一主机上运行的Map)将执行一些远程读取。这导致的轻微开销通常不显著。
假设一个记录行被分成两个块(b1和b2)。处理第一个块(b1)的Map器将注意到最后一行没有eol分隔符,并从下一个数据块(b2)中提取行的剩余部分。
处理第二个块(b2)的Map器如何确定第一条记录不完整,并且应该从块(b2)中的第二条记录开始处理?

kgsdhlau

kgsdhlau1#

据我所知 FileSplit 为第一个块初始化,则调用默认构造函数。因此,“开始”和“长度”的值最初为零。在第一个块的处理结束时,如果最后一行不完整,那么长度值将大于拆分的长度,并且它也将读取下一个块的第一行。因此,第一个块的起始值将大于零,在这种情况下 LineRecordReader 将跳过第二个街区的第一行((见来源)
如果第一个块的最后一行完成,那么长度值将等于第一个块的长度,第二个块的开始值将为零。在这种情况下 LineRecordReader 不会跳过第一行并从开头读取第二个块。
有道理吗?

fsi0uk1n

fsi0uk1n2#

从linerecordreader.java的hadoop源代码中,我发现了一些注解:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
  start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

由此我相信hadoop会为每一次分割多读一行(在当前分割的末尾,在下一次分割中读下一行),如果不是第一次分割,第一行将被丢弃。这样就不会有行记录丢失和不完整

nzk0hqpo

nzk0hqpo3#

Map绘制者不必交流。文件块在hdfs中,当前Map器(recordreader)可以读取包含行的剩余部分的块。这发生在幕后。

vql8enpb

vql8enpb4#

有趣的问题,我花了一些时间看代码的细节,这里是我的想法。拆分由客户机处理 InputFormat.getSplits ,因此查看fileinputformat会得到以下信息:
对于每个输入文件,获取文件长度、块大小并计算拆分大小 max(minSize, min(maxSize, blockSize)) 哪里 maxSize 对应于 mapred.max.split.size 以及 minSizemapred.min.split.size .
把文件分成不同的部分 FileSplit 基于上面计算的拆分大小。这里最重要的是 FileSplit 初始化为 start 与输入文件中的偏移量相对应的参数。在那一点上还没有处理这些线路。代码的相关部分如下所示:

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
                           blkLocations[blkIndex].getHosts()));
  bytesRemaining -= splitSize;
}

在那之后,如果你看看 LineRecordReader 这是由 TextInputFormat ,这是处理行的地方:
当您初始化 LineRecordReader 它试图示例化 LineReader 这是一个抽象的概念,可以把行读一遍 FSDataInputStream . 有两种情况:
如果有 CompressionCodec 定义,则此编解码器负责处理边界。可能与你的问题无关。
但是,如果没有编解码器,那就是有趣的地方:如果 start 你的 InputSplit 与0不同,则返回1个字符,然后跳过由\n或\r\n(windows)标识的第一行!回溯很重要,因为如果您的线边界与分割边界相同,这将确保您不会跳过有效的线。以下是相关代码:

if (codec != null) {
   in = new LineReader(codec.createInputStream(fileIn), job);
   end = Long.MAX_VALUE;
} else {
   if (start != 0) {
     skipFirstLine = true;
     --start;
     fileIn.seek(start);
   }
   in = new LineReader(fileIn, job);
}
if (skipFirstLine) {  // skip first line and re-establish "start".
  start += in.readLine(new Text(), 0,
                    (int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;

因此,由于分割是在客户机中计算的,Map程序不需要按顺序运行,每个Map程序都已经知道是否需要丢弃第一行。
所以基本上,如果你在同一个文件中有两行每100mb,为了简化,假设分割大小是64mb。然后在计算输入拆分时,我们将有以下场景:
包含此块的路径和主机的拆分1。从200-200=0mb开始初始化,长度64mb。
split 2从200-200+64=64mb开始初始化,长度64mb。
split 3从200-200+128=128mb开始初始化,长度64mb。
split 4从200-200+192=192mb开始初始化,长度8mb。
mapper a将处理split 1,start是0,所以不要跳过第一行,读取超过64mb限制的整行,所以需要远程读取。
Map器b将处理split 2,start is!=0所以跳过64mb-1byte之后的第一行,它对应于第1行的结尾100mb,它仍然在拆分2中,我们在拆分2中有28mb的行,所以远程读取剩余的72mb。
Map器c将处理split 3,开始是!=0所以跳过128mb-1byte之后的第一行,它对应于第2行的结尾200mb,这是文件的结尾,所以不要做任何事情。
Map器d与Map器c相同,只是它在192mb-1byte之后寻找一条换行符。

noj0wjuj

noj0wjuj5#

我认为它如下:inputformat负责将数据拆分为逻辑拆分,并考虑到数据的性质。
没有什么可以阻止它这样做,尽管它会给作业增加显著的延迟—所有围绕所需拆分大小边界的逻辑和读取都将发生在jobtracker中。
最简单的记录感知输入格式是textinputformat。它的工作原理如下(据我从代码中了解的情况)-输入格式按大小创建拆分,不考虑行,但linerecordreader始终:
a) 如果不是第一个拆分,则跳过拆分中的第一行(或部分)
b) 最后读取分割边界后的一行(如果数据可用,则不是最后一次分割)。

ocebsuys

ocebsuys6#

map reduce算法对文件的物理块不起作用。它适用于逻辑输入拆分。输入拆分取决于记录的写入位置。一个记录可以跨越两个Map器。
按照hdfs的设置方式,它将非常大的文件分解成大的块(例如,128mb大小),并将这些块的三个副本存储在集群中的不同节点上。
hdfs不知道这些文件的内容。一条记录可能在a区开始,但该记录的结尾可能在b区出现。
为了解决这个问题,hadoop使用了存储在文件块中的数据的逻辑表示,称为输入拆分。当mapreduce作业客户端计算输入拆分时,它会计算出块中第一个完整记录的开始位置和块中最后一个记录的结束位置。
关键点:
在块中最后一条记录不完整的情况下,输入分割包括下一块的位置信息和完成记录所需数据的字节偏移量。
请看下图。

看看这篇文章和相关的se问题:关于hadoop/hdfs文件分割
更多细节可以从文档中阅读
map reduce框架依赖于作业的inputformat来:
验证作业的输入规范。
将输入文件拆分为逻辑InputSplit,然后将每个逻辑InputSplit分配给一个单独的Map器。
然后将每个inputsplit分配给一个单独的Map器进行处理。拆分可以是元组。 InputSplit[] getSplits(JobConf job,int numSplits )是处理这些事情的api。
fileinputformat,扩展 InputFormat 实施 getSplits ()方法。请在grepcode中查看此方法的内部结构

相关问题