我们想通过flink的bucketingsink或streamingfilesink将压缩数据写入hdfs。我已经写了我自己的作家,如果没有失败的作品很好。但是,当它遇到故障并从检查点重新启动时,它将生成有效长度的文件(hadoop<2.7)或截断该文件。不幸的是gzip是二进制文件,在文件末尾有一个拖车。因此简单的截断在我的例子中不起作用。有没有什么想法可以让hdfs接收器只支持一次语义压缩?
这是我作者的密码:
public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {
private static final long serialVersionUID = 2L;
/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "\n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}
}
1条答案
按热度按时间holgip5t1#
我建议实施
BulkWriter
对于StreamingFileSink
它通过一个GZIPOutputStream
. 代码可能如下所示: