hsync()不适用于sequencefile writer

qkf9rpyu  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(389)

我有一个小程序,它每秒钟在hdfs上向一个块压缩的sequencefile写入10条记录,然后每5分钟运行sync(),以确保超过5分钟的所有记录都可以处理。
因为我的代码有很多行,所以我只提取了重要的部分:

// initialize

Configuration hdfsConfig = new Configuration();

CompressionCodecFactory codecFactory = new CompressionCodecFactory(hdfsConfig);
CompressionCodec compressionCodec = codecFactory.getCodecByName("default");

SequenceFile.Writer writer = SequenceFile.createWriter(
    hdfsConfig,
    SequenceFile.Writer.file(path),
    SequenceFile.Writer.keyClass(LongWritable.class),
    SequenceFile.Writer.valueClass(Text.class),
    SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK;, compressionCodec)
);

// ...

// append

LongWritable key = new LongWritable((new Date).getTime());
Text val = new Text("Some value");
writer.append(key, val);

// ...

// then every 5 minutes...

logger.info("about to sync...");
writer.hsync();
logger.info("synced!");

仅从日志来看,sync操作似乎与预期一样工作,但是hdfs上的文件仍然很小。一段时间后,可能会添加一些标题和一些事件,但甚至接近i hsync()的频率。文件关闭后,所有内容都将立即刷新。
在每次预期的同步之后,我们也尝试手动检查文件的内容,以查看数据是否存在,但是,文件在这里也显示为空:hdfs dfs-text filename
writer.hsync()不工作有什么已知的原因吗?如果有,有什么解决方法吗?
此问题的进一步测试用例:

import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.IOException;

import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;

public class WriteTest {
    private static final Logger LOG = LoggerFactory.getLogger(WriteTest.class);

    public static void main(String[] args) throws Exception {

        SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        CompressionCodec compressionCodec;
        String compressionCodecStr = "default";
        CompressionCodecFactory codecFactory;
        Configuration hdfsConfig = new Configuration();

        codecFactory = new CompressionCodecFactory(hdfsConfig);
        compressionCodec = codecFactory.getCodecByName(compressionCodecStr);

        String hdfsURL = "hdfs://10.0.0.1/writetest/";

        Date date = new Date();

        Path path = new Path(
            hdfsURL,
            "testfile" + date.getTime()
        );

        SequenceFile.Writer writer = SequenceFile.createWriter(
            hdfsConfig,
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class),
            SequenceFile.Writer.compression(compressionType, compressionCodec),
            SequenceFile.Writer.file(path)
        );

        for(int i=0;i<10000000;i++) {

            Text value = new Text("New value!");
            LongWritable key = new LongWritable(date.getTime());

            writer.append(key, value);
            writer.hsync();

            Thread.sleep(1000);
        }

        writer.close();
    }
}

结果是在开始写入sequencefile头时有一个fsync,然后就没有更多的fsync了。文件关闭后,内容将写入光盘。

dgjrabp2

dgjrabp21#

这里有多个问题。
块压缩
当您对序列文件使用块压缩时,这意味着许多条目将被缓冲在内存中,然后在达到或超过限制时以块压缩形式写入 sync 手动调用。
当你打电话的时候 hsync 在它调用的writer上 hsync 在其基础上 FSDataOutputStream . 但是,这不会将压缩缓冲区中的数据写入内存。因此,要将数据可靠地发送到datanode,必须调用 sync 先打电话,再打电话 hsync .
注意,这样做意味着发送到datanode的块压缩部分包含的条目比通常少。这对压缩质量有负面影响,可能会导致更多的光盘使用(我想这就是原因 hsync 不呼叫 sync 内部。)
报告给namenode的文件大小
打电话 fsync 向datanode发送数据,但不向namenode报告新的文件大小。关于这方面的技术讨论可以在这里和这里找到。显然,每次更新长度都会对性能造成不利影响。有一个特殊版本的 hsync 它允许更新namenode信息,但不会被 SequenceFile.Writer .


* @param syncFlags

    *          Indicate the semantic of the sync. Currently used to specify
    *          whether or not to update the block length in NameNode.
    */
    public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
        flushOrSync(true, syncFlags);
    }

一方面,大小问题意味着即使某些工具报告的文件大小没有改变,数据仍然安全地到达datanodes,并且在打开它们上的inputstream时可以读取。另一方面,压缩类型的sequencefile.reader中有一个bug Record 以及 None . 对于这些压缩类型,读取器使用长度信息来确定读取的距离。因为此长度信息不会由更新 hsync 即使数据确实可用,它也会错误地停止读取。 Block 压缩阅读显然不使用长度信息,也没有遭受这个错误。

相关问题