在从kafka获取json数据并将其保存为parquet文件并将其加载到hive中的过程中,我遇到了flink bucketingsink中提到的同一问题,即使用自定义avroparquetwriter创建空文件。有人知道怎么解决吗?谢谢您。我使用了ApacheFlink1.4.0+HDFS2.7.3
ffdz8vbo1#
您可以直接实现 Writer 接口。它可以如下所示:
Writer
import org.apache.flink.util.Preconditions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; /** * Parquet writer. * * @param <T> */ public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> { private static final long serialVersionUID = -975302556515811398L; private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY; private final int pageSize = 64 * 1024; private final String schemaRepresentation; private transient Schema schema; private transient ParquetWriter<GenericRecord> writer; private transient Path path; private int position; public ParquetSinkWriter(String schemaRepresentation) { this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation); } @Override public void open(FileSystem fs, Path path) throws IOException { this.position = 0; this.path = path; if (writer != null) { writer.close(); } writer = createWriter(); } @Override public long flush() throws IOException { Preconditions.checkNotNull(writer); position += writer.getDataSize(); writer.close(); writer = createWriter(); return position; } @Override public long getPos() throws IOException { Preconditions.checkNotNull(writer); return position + writer.getDataSize(); } @Override public void close() throws IOException { if (writer != null) { writer.close(); writer = null; } } @Override public void write(T element) throws IOException { Preconditions.checkNotNull(writer); writer.write(element); } @Override public Writer<T> duplicate() { return new ParquetSinkWriter<>(schemaRepresentation); } private ParquetWriter<GenericRecord> createWriter() throws IOException { if (schema == null) { schema = new Schema.Parser().parse(schemaRepresentation); } return AvroParquetWriter.<GenericRecord>builder(path) .withSchema(schema) .withDataModel(new GenericData()) .withCompressionCodec(compressionCodecName) .withPageSize(pageSize) .build(); } }
1条答案
按热度按时间ffdz8vbo1#
您可以直接实现
Writer
接口。它可以如下所示: