有一些二进制文件需要拆分(根据一些逻辑)并分发给Map器。我使用hadoop流媒体。主要的问题是在不改变二进制数据块的情况下通过网络发送精确的二进制数据块。事实证明,发送原始字节并不是一件小事。
为了更好地说明这个问题,我写了一个非常简单的扩展 RecordReader
类,该类应该从拆分中读取一些字节并发送它们。二进制数据可以包含任何内容(包括换行符)。这是什么 next()
可能会读到:
public class MyRecordReader implements
RecordReader<BytesWritable, BytesWritable> {
...
public boolean next(BytesWritable key, BytesWritable ignore)
throws IOException {
...
byte[] result = new byte[8];
for (int i = 0; i < result.length; ++i)
result[i] = (byte)(i+1);
result[3] = (byte)'\n';
result[4] = (byte)'\n';
key.set(result, 0, result.length);
return true;
}
}
在这个场景中,每个调用 next()
函数应将以下字节序列写入stdin: 01 02 03 0a 0a 06 07 08
. 如果我使用类型化字节(hadoop-1722),那么序列的前缀应该是总共五个字节,第一个字节表示序列的类型(0表示字节),另外四个字节表示大小。所以序列应该是这样的: 00
00 00
00 08
01 02
03 0a
0a 06
07 08
.
我测试了一下 /bin/cat
要验证结果,命令如下:
hadoop jar <streaming jar location>
-libjars <my input format jar>
-D stream.map.input=typedbytes
-mapper /bin/cat
-inputformat my.input.Format
使用 hexdump
要查看传入的密钥,我得到了这个: 00
00 00
00 08
01 02
03 09
0a 09
0a 06
07 08
. 你可以看到 0a
(换行符)前缀为 09
(tab)typed bytes提供了(以前)有关字节序列的类型和大小的正确信息。
这给使用其他语言编写Map程序带来了一个严重的问题,因为字节在编写过程中会发生变化。
似乎不能保证字节将被发送完全一样,除非有另一个我遗漏了什么?
1条答案
按热度按时间vohkndzv1#
由于hadoop用户邮件列表中的一个非常有用的提示,我找到了这个问题的解决方案。
简而言之,我们需要重写hadoop io如何向标准流写入/读取数据。为此:
延伸
InputWriter
,OutputReader
,也提供您自己的InputFormat
以及OutputFormat
这样您就可以完全控制向流中写入字节和从流中读取字节的方式。延伸
IdentifierResolver
类来告诉hadoop使用自己的InputWriter
以及OutputReader
.使用你的
IdentifierResolver
,InputFormat
,和OuputFormat
具体如下:feature(未合并)mapreduce-5018中提供的补丁是如何做到这一点的一个很好的来源,可以根据需要进行定制。