如何在gzip压缩时对flink向hdfs接收数据进行容错?

nfeuvbwi  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(413)

我们想通过flink的bucketingsink或streamingfilesink将压缩数据写入hdfs。我已经写了我自己的作家,如果没有失败的作品很好。但是,当它遇到故障并从检查点重新启动时,它将生成有效长度的文件(hadoop<2.7)或截断该文件。不幸的是gzip是二进制文件,在文件末尾有一个拖车。因此简单的截断在我的例子中不起作用。有没有什么想法可以让hdfs接收器只支持一次语义压缩?
这是我作者的密码:

public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {

private static final long serialVersionUID = 2L;

/**
 * The {@code CompressFSDataOutputStream} for the current part file.
 */
private transient GZIPOutputStream compressionOutputStream;

public HdfsCompressStringWriter() {}

@Override
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.setSyncOnFlush(true);
    compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}

public void close() throws IOException {
    if (compressionOutputStream != null) {
        compressionOutputStream.close();
        compressionOutputStream = null;
    }
    resetStream();
}

@Override
public void write(JSONObject element) throws IOException {
    if (element == null || !element.containsKey("body")) {
        return;
    }
    String content = element.getString("body") + "\n";
    compressionOutputStream.write(content.getBytes());
    compressionOutputStream.flush();
}

@Override
public Writer<JSONObject> duplicate() {
    return new HdfsCompressStringWriter();
}

}

holgip5t

holgip5t1#

我建议实施 BulkWriter 对于 StreamingFileSink 它通过一个 GZIPOutputStream . 代码可能如下所示:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);

    final DataStream<Integer> input = env.addSource(new InfinitySource());

    final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
    input.addSink(streamingFileSink);

    env.execute();
}

private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    @Override
    public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
        final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
        return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
    }
}

private static class GzipBulkWriter<T> implements BulkWriter<T> {

    private final GZIPOutputStream gzipOutputStream;
    private final ObjectOutputStream objectOutputStream;

    public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
        this.gzipOutputStream = gzipOutputStream;
        this.objectOutputStream = objectOutputStream;
    }

    @Override
    public void addElement(T t) throws IOException {
        objectOutputStream.writeObject(t);
    }

    @Override
    public void flush() throws IOException {
        objectOutputStream.flush();
    }

    @Override
    public void finish() throws IOException {
        objectOutputStream.flush();
        gzipOutputStream.finish();
    }
}

相关问题