parquet格式hdfs-write

jfewjypa  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(317)

我试图转换一个Kafka消息,这是一个巨大的rdd到Parquet格式,并保存在hdfs使用Spark流。它是一个syslog消息,比如每行中的name1=value1 | name2=value2 | name3=value3,有没有关于如何在spark流中实现这一点的指针?

toe95027

toe950271#

你可以保存一个 RDD Parquet而不转换为 DataFrame 只要你有一个 avro 它的架构
下面是一个示例函数:

public <T> void save(JavaRDD<T> rdd, Class<T> clazz, Time timeStamp, Schema schema, String path) throws IOException {
    Job job = Job.getInstance();
    ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
    AvroParquetOutputFormat.setSchema(job, schema);
    LazyOutputFormat.setOutputFormatClass(job, new ParquetOutputFormat<T>().getClass());
    job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"); 
    job.getConfiguration().set("parquet.enable.summary-metadata", "false"); 

    //save the file
    rdd.mapToPair(me -> new Tuple2(null, me))
            .saveAsNewAPIHadoopFile(
                    String.format("%s/%s", path, timeStamp.milliseconds()),
                    Void.class,
                    clazz,
                    LazyOutputFormat.class,
                    job.getConfiguration());
}
deyfvvtc

deyfvvtc2#

首先转换 RDDDataFrame . 然后保存。

相关问题