使用可变长度/非分隔二进制文件的hadoop中的gis拆分

l5tcr1uw  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(396)

我刚刚开始为开放街道Map数据开发一个基于hadoop的摄取器。有几种格式-但我一直针对基于protocolbuffer的格式(注意-它不是纯pb)。
在我看来,将文件预拆分为序列文件会更有效—而不是以自定义记录读取器/输入格式处理可变长度编码—但需要进行健全性检查。
格式在pbf format description中有更详细的描述,但基本上它是[blobheader,blob]块的集合。
有一个水滴头

message BlobHeader {
   required string type = 1;
   optional bytes indexdata = 2;
   required int32 datasize = 3;
 }

然后是blob(其大小由标头中的datasize参数定义)

message Blob {
   optional bytes raw = 1; // No compression
   optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
   optional bytes zlib_data = 3;
   // optional bytes lzma_data = 4; // PROPOSED.
   // optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
 }

很明显,一旦深入到blob中,就会有更多的结构——但我会在mapper中处理这个问题——我想做的是,最初每个mapper有一个blob(稍后可能是每个mapper blob的倍数)。
其他一些输入格式/记录读取器使用“足够大”的拆分大小,然后向后/向前查找分隔符-但是由于没有分隔符可以让我知道blob/headers的偏移量-也没有指向它们的索引-我看不到任何方法来获取拆分点,而不首先对文件进行流式处理。
现在,我不需要从磁盘上读取整个文件—我可以从读取头开始,使用该信息查找blob,将其设置为第一个分割点,然后重复。但这是我能想到的唯一一个可以替代预分割成序列文件的方法。
有没有更好的方法来处理这个问题?如果没有,对这两个建议有什么想法?

toiithl6

toiithl61#

好吧,我用getsplits方法解析二进制文件——由于跳过了99%以上的数据,所以速度非常快(planet osm 22gb world文件大约20秒)。如果其他人遇到了问题,下面是getsplits方法。

@Override
public List<InputSplit> getSplits(JobContext context){
    List<InputSplit> splits = new ArrayList<InputSplit>();
    FileSystem fs = null;
    Path file = OSMPBFInputFormat.getInputPaths(context)[0]; 
    FSDataInputStream in = null;
    try {
        fs = FileSystem.get(context.getConfiguration());
        in = fs.open(file);
        long pos = 0;
        while (in.available() > 0){
            int len = in.readInt(); 
            byte[] blobHeader = new byte[len]; 
            in.read(blobHeader);
            BlobHeader h = BlobHeader.parseFrom(blobHeader);
            FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
            splits.add(split);
            pos += 4;
            pos += len;
            pos += h.getDatasize();
            in.skip(h.getDatasize());
        }
    } catch (IOException e) {
        sLogger.error(e.getLocalizedMessage());
    } finally {
        if (in != null) {try {in.close();}catch(Exception e){}};
        if (fs != null) {try {fs.close();}catch(Exception e){}};
    }
    return splits;
}

工作良好,迄今为止-虽然我还没有地面truthed输出尚未。它肯定比将pbf复制到hdfs、在一个Map器中转换成一个序列,然后摄取(复制时间占主导)要快得多。它也比在hdfs中使用外部程序拷贝到序列文件,然后针对hdfs运行Map程序(后者编写脚本)快20%。所以这里没有抱怨。
请注意,这将为每个块生成一个Map器,即行星世界文件的~23kMap器。实际上,我是在每次拆分时捆绑多个块—只需在拆分添加到集合之前循环x次。
对于blobheader,我刚刚从上面的osmwiki链接编译了protobuf.proto文件。如果需要,也可以从osm二进制类中提取预生成的-maven片段是:

<dependency>
    <groupId>org.openstreetmap.osmosis</groupId>
    <artifactId>osmosis-osm-binary</artifactId>
    <version>0.43-RELEASE</version>
</dependency>

相关问题