如何使用flink将数据集作为Parquet文件写入s3 bucket。有没有像spark:df.write.parquet(“write-in-parquet”)这样的直接函数
请帮助我如何写Parquet格式的flink数据集。
尝试将数据集转换为(void,genericord)时遇到了问题
DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<Void, GenericRecord>>() {
@Override
public void flatMap(Tuple2<LongWritable, Text> longWritableTextTuple2, Collector<Tuple2<Void, GenericRecord>> collector) throws Exception {
JsonAvroConverter converter = new JsonAvroConverter();
Schema schema = new Schema.Parser().parse(new File("test.avsc"));
try {
GenericRecord record = converter.convertToGenericDataRecord(longWritableTextTuple2.f1.toString().getBytes(), schema);
collector.collect( new Tuple2<Void,GenericRecord>(null,record));
}
catch (Exception e) {
System.out.println("error in converting to avro")
}
}
});
Job job = Job.getInstance();
HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
df.output(parquetFormat);
env.execute();
请帮助我做错事。我得到了例外,这个代码不工作。
3条答案
按热度按时间jslywgbw1#
您将通过
new HadoopOutputFormat(parquetOutputFormat, job)
,然后传递给DataSet.output()
.这份工作来自。。。
这个
parquetOutputFormat
通过以下方式创建:看到了吗https://javadoc.io/doc/org.apache.parquet/parquet-hadoop/1.10.1/org/apache/parquet/hadoop/parquetoutputformat.html
byqmnocz2#
你没有说你得到了哪一个例外,但这里有一个完整的例子如何实现这一点。
要点是:
使用
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
从依赖关系org.apache.flink:flink-hadoop-compatibility_2.11:1.11.0
HadoopOutputFormat
是一个适配器,允许您使用为hadoop开发的输出格式你需要一个
DataSet<Tuple2<Void,IndexedRecord>>
,因为hadoopOutputFormat<K,V>
与键值对一起工作,我们不感兴趣的键因此使用Void
对于键类型,值必须是avro的IndexedRecord
或者GenericRecord
.使用
org.apache.parquet.avro.AvroParquetOutputFormat<IndexedRecord>
从依赖关系org.apache.parquet:parquet-avro:1.11.1
这个hadoop的outputformat生成parquet它继承自
org.apache.parquet.hadoop.FileOutputFormat<Void, IndexedRecord>
创建自己的子类IndexedRecord
你不能使用new GenericData.Record(schema)
因为这样的记录是不可序列化的java.io.NotSerializableException: org.apache.avro.Schema$Field is not serializable
flink要求它是可序列化的。你还需要提供一个
getSchema()
方法,但您可以null
或返回Schema
在静态成员中保存的(这样就不需要序列化它,并且可以避免java.io.NotSerializableException: org.apache.avro.Schema$Field is not serializable
)源代码
这个
schema.avsc
只是以及依赖关系:
eit6fx6z3#
比Spark更复杂一点。我能够在flink中读写Parquet地板数据的唯一方法是通过hadoop&mapreduce兼容性。你需要
hadoop-mapreduce-client-core
以及flink-hadoop-compatibility
在你的依赖中。然后你需要创建一个适当的HadoopOutoutFormat
. 你需要这样做:然后你可以做: