为什么sequencefile writer的append操作会用最后一个值覆盖所有值?

zpjtge22  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(323)

首先,考虑一下customwriter类:

public final class CustomWriter {

  private final SequenceFile.Writer writer;

  CustomWriter(Configuration configuration, Path outputPath) throws IOException {
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(outputPath)) {
      fileSystem.delete(outputPath, true);
    }

    writer = SequenceFile.createWriter(configuration,
        SequenceFile.Writer.file(outputPath),
        SequenceFile.Writer.keyClass(LongWritable.class),
        SequenceFile.Writer.valueClass(ItemWritable.class),
        SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new DefaultCodec()),
        SequenceFile.Writer.blockSize(1024 * 1024),
        SequenceFile.Writer.bufferSize(fileSystem.getConf().getInt("io.file.buffer.size", 4 * 1024)),
        SequenceFile.Writer.replication(fileSystem.getDefaultReplication(outputPath)),
        SequenceFile.Writer.metadata(new SequenceFile.Metadata()));
  }

  public void close() throws IOException {
    writer.close();
  }

  public void write(Item item) throws IOException {
    writer.append(new LongWritable(item.getId()), new ItemWritable(item));
  }
}

我要做的是使用一个异步的 Item 键入对象。消费者引用了 CustomWriter 示例。然后它调用 CustomWriter#write 方法获取它接收的每个项。当溪流结束时 CustomWriter#close 方法来关闭编写器。
如您所见,我只创建了一个writer,它开始附加到一个全新的文件中。所以,毫无疑问这不是原因。
我还应该注意到,我目前正在单元测试环境中使用 MiniDFSCluster 按照这里的说明。如果我在一个非单元测试环境中运行它(即没有 MiniDFSCluster ),它似乎工作得很好。
当我试图读回文件时,我看到的只是最后一次写的 Item 对象n次(其中n是流中接收的项目总数)。举个例子:

sparkContext.hadoopFile(path, SequenceFileInputFormat.class, LongWritable.class, ItemWritable.class)
    .collect()
    .forEach(new BiConsumer<>() {
      @Override
      public void accept(Tuple2<LongWritable, ItemWritable> tuple) {
        LongWritable id = tuple._1();
        ItemWritable item = tuple._2();
        System.out.print(id.get() + " -> " + item.get());
      }
    });

这将打印如下内容:

...
1234 -> Item[...]
1234 -> Item[...]
1234 -> Item[...]
...

我做错什么了吗?或者,这是使用 MiniDFSCluster ?

kh212irz

kh212irz1#

Writable (例如 LongWritable, ItemWritable )在处理数据期间重用。收到记录时, Writable 通常只是替换它的内容,而您只会收到相同的内容 Writable 对象。如果要将它们收集到数组中,应该将它们复制到新对象中。

相关问题