我试图转换一个Kafka消息,这是一个巨大的rdd到Parquet格式,并保存在hdfs使用Spark流。它是一个syslog消息,比如每行中的name1=value1 | name2=value2 | name3=value3,有没有关于如何在spark流中实现这一点的指针?
toe950271#
你可以保存一个 RDD Parquet而不转换为 DataFrame 只要你有一个 avro 它的架构下面是一个示例函数:
RDD
DataFrame
avro
public <T> void save(JavaRDD<T> rdd, Class<T> clazz, Time timeStamp, Schema schema, String path) throws IOException { Job job = Job.getInstance(); ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class); AvroParquetOutputFormat.setSchema(job, schema); LazyOutputFormat.setOutputFormatClass(job, new ParquetOutputFormat<T>().getClass()); job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"); job.getConfiguration().set("parquet.enable.summary-metadata", "false"); //save the file rdd.mapToPair(me -> new Tuple2(null, me)) .saveAsNewAPIHadoopFile( String.format("%s/%s", path, timeStamp.milliseconds()), Void.class, clazz, LazyOutputFormat.class, job.getConfiguration()); }
deyfvvtc2#
首先转换 RDD 到 DataFrame . 然后保存。
2条答案
按热度按时间toe950271#
你可以保存一个
RDD
Parquet而不转换为DataFrame
只要你有一个avro
它的架构下面是一个示例函数:
deyfvvtc2#
首先转换
RDD
到DataFrame
. 然后保存。