java.lang.nullpointerexception:在spark流作业中写入Parquet文件时,writesupportclass不应为null

xdnvmnnf  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(573)

在spark流媒体工作中,我使用下面的代码片段将rdd数据保存到hadoop的hdfs中的Parquet文件中:

readyToSave.foreachRDD((VoidFunction<JavaPairRDD<Void, MyProtoRecord>>) rdd -> {          
            Configuration configuration = rdd.context().hadoopConfiguration();
            Job job = Job.getInstance(configuration);
            ParquetOutputFormat.setWriteSupportClass(job, ProtoWriteSupport.class);
            ProtoParquetOutputFormat.setProtobufClass(job, MyProtoRecord.class);
            rdd.saveAsNewAPIHadoopFile("path-to-hdfs", Void.class, MyProtoRecord.class, ParquetOutputFormat.class, configuration);
    });

我得到以下例外:

java.lang.NullPointerException: writeSupportClass should not be null
at parquet.Preconditions.checkNotNull(Preconditions.java:38)
at parquet.hadoop.ParquetOutputFormat.getWriteSupport(ParquetOutputFormat.java:326)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:272)
at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1112)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我怎样才能解决这个问题?

pu82cl6c

pu82cl6c1#

发现问题了!调用“saveasnewapihadoopfile()方法”时,应指定作业的配置(job.getconfiguration()):

readyToSave.foreachRDD((VoidFunction<JavaPairRDD<Void, MyProtoRecord>>) rdd -> {
            Configuration configuration = rdd.context().hadoopConfiguration();
            Job job = Job.getInstance(configuration);
            ParquetOutputFormat.setWriteSupportClass(job, ProtoWriteSupport.class);
            ProtoParquetOutputFormat.setProtobufClass(job, MyProtoRecord.class);
            rdd.saveAsNewAPIHadoopFile("path-to-hdfs", Void.class, MyProtoRecord.class, ParquetOutputFormat.class, job.getConfiguration());
    });

相关问题