flink如何在s3中将数据集作为Parquet文件编写?

iih3973s  于 2021-06-24  发布在  Flink
关注(0)|答案(3)|浏览(501)

如何使用flink将数据集作为Parquet文件写入s3 bucket。有没有像spark:df.write.parquet(“write-in-parquet”)这样的直接函数
请帮助我如何写Parquet格式的flink数据集。
尝试将数据集转换为(void,genericord)时遇到了问题

DataSet<Tuple2<Void,GenericRecord>> df = allEvents.flatMap(new FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<Void, GenericRecord>>() {
                @Override
                public void flatMap(Tuple2<LongWritable, Text> longWritableTextTuple2, Collector<Tuple2<Void, GenericRecord>> collector) throws Exception {
                    JsonAvroConverter converter = new JsonAvroConverter();
                    Schema schema = new Schema.Parser().parse(new File("test.avsc"));
                    try {
                        GenericRecord record = converter.convertToGenericDataRecord(longWritableTextTuple2.f1.toString().getBytes(), schema);
                        collector.collect( new Tuple2<Void,GenericRecord>(null,record));
                    }
                    catch (Exception e) {
                        System.out.println("error in converting to avro")
                    }
                }
                });
        Job job = Job.getInstance();
        HadoopOutputFormat parquetFormat = new HadoopOutputFormat<Void, GenericRecord>(new AvroParquetOutputFormat(), job);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        df.output(parquetFormat);
        env.execute();

请帮助我做错事。我得到了例外,这个代码不工作。

jslywgbw

jslywgbw1#

您将通过 new HadoopOutputFormat(parquetOutputFormat, job) ,然后传递给 DataSet.output() .
这份工作来自。。。

import org.apache.hadoop.mapreduce.Job;

...

Job job = Job.getInstance();

这个 parquetOutputFormat 通过以下方式创建:

import org.apache.parquet.hadoop.ParquetOutputFormat;

...
ParquetOutputFormat<MyOutputType> parquetOutputFormat = new ParquetOutputFormat<>();

看到了吗https://javadoc.io/doc/org.apache.parquet/parquet-hadoop/1.10.1/org/apache/parquet/hadoop/parquetoutputformat.html

byqmnocz

byqmnocz2#

你没有说你得到了哪一个例外,但这里有一个完整的例子如何实现这一点。
要点是:
使用 org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat 从依赖关系
org.apache.flink:flink-hadoop-compatibility_2.11:1.11.0 HadoopOutputFormat 是一个适配器,允许您使用为hadoop开发的输出格式
你需要一个 DataSet<Tuple2<Void,IndexedRecord>> ,因为hadoop OutputFormat<K,V> 与键值对一起工作,我们不感兴趣的键因此使用 Void 对于键类型,值必须是avro的 IndexedRecord 或者 GenericRecord .
使用 org.apache.parquet.avro.AvroParquetOutputFormat<IndexedRecord> 从依赖关系 org.apache.parquet:parquet-avro:1.11.1 这个hadoop的outputformat生成parquet
它继承自 org.apache.parquet.hadoop.FileOutputFormat<Void, IndexedRecord> 创建自己的子类 IndexedRecord 你不能使用 new GenericData.Record(schema) 因为这样的记录是不可序列化的 java.io.NotSerializableException: org.apache.avro.Schema$Field is not serializable flink要求它是可序列化的。
你还需要提供一个 getSchema() 方法,但您可以 null 或返回 Schema 在静态成员中保存的(这样就不需要序列化它,并且可以避免 java.io.NotSerializableException: org.apache.avro.Schema$Field is not serializable )
源代码

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.parquet.avro.AvroParquetOutputFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class MyParquetTest implements Serializable {

    public static void main(String[] args) throws Exception {
        new MyParquetTest().start();

    }

    private void start() throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        Configuration parameters = new Configuration();

        Stream<String> stringStream = IntStream.range(1, 100).mapToObj(n -> String.format("Entry %d", n));
        DataSet<String> text = env.fromCollection(stringStream.collect(Collectors.toCollection(ArrayList::new)));

        Job job = Job.getInstance();
        HadoopOutputFormat<Void, IndexedRecord>  hadoopOutputFormat = new HadoopOutputFormat<>(new AvroParquetOutputFormat<IndexedRecord>(), job);
        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, CompressionCodecName.SNAPPY.getHadoopCompressionCodecClass());
        FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./my-parquet"));
        final Schema schema = new Schema.Parser().parse(MyRecord.class.getClassLoader().getResourceAsStream("schema.avsc"));
        AvroParquetOutputFormat.setSchema(job, schema);

        DataSet<Tuple2<Void, IndexedRecord>> text2 = text.map(new MapFunction<String, Tuple2<Void, IndexedRecord>>() {
            @Override
            public Tuple2<Void, IndexedRecord> map(String value) throws Exception {
                return Tuple2.of(null, new MyRecord(value));
//              IndexedRecord record = new GenericData.Record(schema); // won't work becuase Schema$Field is not serializable
//              record.put(0, value);
//              return Tuple2.of(null, record);
            }
        });

        text2.output(hadoopOutputFormat);
        env.execute("Flink Batch Java API Skeleton");
    }

    public static class MyRecord implements IndexedRecord {

        private static  Schema schema;

        static {
            try {
                schema = new Schema.Parser().parse(MyRecord.class.getClassLoader().getResourceAsStream("schema.avsc"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        private final String value;

        public MyRecord(String value) {
            this.value= value;
        }

        @Override
        public void put(int i, Object v) {
            throw new NotImplementedException("You can't update this IndexedRecord");
        }

        @Override
        public Object get(int i) {
            return this.value;
        }

        @Override
        public Schema getSchema() {
            return schema; // or just return null and remove the schema member
        }
    }
}

这个 schema.avsc 只是

{
  "name": "aa",
  "type": "record",
  "fields": [
    {"name": "value", "type": "string"}
  ]
}

以及依赖关系:

implementation "org.apache.flink:flink-java:${flinkVersion}"
implementation "org.apache.flink:flink-avro:${flinkVersion}"
implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-hadoop-compatibility_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.parquet:parquet-avro:1.11.1"
implementation "org.apache.hadoop:hadoop-client:2.8.3"
eit6fx6z

eit6fx6z3#

比Spark更复杂一点。我能够在flink中读写Parquet地板数据的唯一方法是通过hadoop&mapreduce兼容性。你需要 hadoop-mapreduce-client-core 以及 flink-hadoop-compatibility 在你的依赖中。然后你需要创建一个适当的 HadoopOutoutFormat . 你需要这样做:

val job = Job.getInstance()
val hadoopOutFormat = new hadoop.mapreduce.HadoopOutputFormat[Void, SomeType](new AvroParquetOutputFormat(), job)
FileOutputFormat.setOutputPath(job, [somePath])

然后你可以做:

dataStream.writeUsingOutputFormat(hadoopOutFormat)

相关问题