如何使用apache flink在hdfs上按datetime分区编写parquet文件?

yhived7q  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(447)

在从kafka获取json数据并将其保存为parquet文件并将其加载到hive中的过程中,我遇到了flink bucketingsink中提到的同一问题,即使用自定义avroparquetwriter创建空文件。有人知道怎么解决吗?谢谢您。我使用了ApacheFlink1.4.0+HDFS2.7.3

ffdz8vbo

ffdz8vbo1#

您可以直接实现 Writer 接口。它可以如下所示:

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
 * Parquet writer.
 *
 * @param <T>
 */
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

    private static final long serialVersionUID = -975302556515811398L;

    private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
    private final int pageSize = 64 * 1024;

    private final String schemaRepresentation;

    private transient Schema schema;
    private transient ParquetWriter<GenericRecord> writer;
    private transient Path path;

    private int position;

    public ParquetSinkWriter(String schemaRepresentation) {
        this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        this.position = 0;
        this.path = path;

        if (writer != null) {
            writer.close();
        }

        writer = createWriter();
    }

    @Override
    public long flush() throws IOException {
        Preconditions.checkNotNull(writer);
        position += writer.getDataSize();
        writer.close();
        writer = createWriter();

        return position;
    }

    @Override
    public long getPos() throws IOException {
        Preconditions.checkNotNull(writer);
        return position + writer.getDataSize();
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
            writer = null;
        }
    }

    @Override
    public void write(T element) throws IOException {
        Preconditions.checkNotNull(writer);
        writer.write(element);
    }

    @Override
    public Writer<T> duplicate() {
        return new ParquetSinkWriter<>(schemaRepresentation);
    }

    private ParquetWriter<GenericRecord> createWriter() throws IOException {
        if (schema == null) {
            schema = new Schema.Parser().parse(schemaRepresentation);
        }

        return AvroParquetWriter.<GenericRecord>builder(path)
            .withSchema(schema)
            .withDataModel(new GenericData())
            .withCompressionCodec(compressionCodecName)
            .withPageSize(pageSize)
            .build();
    }
}

相关问题