使用spark上下文将Parquet文件读取为rdd(不使用spark sql上下文),给出异常

djmepvbi  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(356)

我试图读写Parquet文件作为rdd使用Spark。我不能在我当前的应用程序中使用sparksql上下文(它需要structtype中的parquet模式,当我从avro模式转换时,在少数情况下会给我castexception)
因此,如果我尝试通过重载avroparquetformat并将parquetinputformat发送到hadoop来实现和保存parquet文件,以以下方式编写:

def saveAsParquetFile[T <: IndexedRecord](records: RDD[T], path: String)(implicit m: ClassTag[T]) = {
            val keyedRecords: RDD[(Void, T)] = records.map(record => (null, record))
            spark.hadoopConfiguration.setBoolean("parquet.enable.summary-metadata", false)
            val job = Job.getInstance(spark.hadoopConfiguration)
            ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])
            AvroParquetOutputFormat.setSchema(job, m.runtimeClass.newInstance().asInstanceOf[IndexedRecord].getSchema())
            keyedRecords.saveAsNewAPIHadoopFile(
              path,
              classOf[Void],
              m.runtimeClass.asInstanceOf[Class[T]],
              classOf[ParquetOutputFormat[T]],
              job.getConfiguration
            )
          }

这是一个错误:

Exception in thread "main" java.lang.InstantiationException: org.apache.avro.generic.GenericRecord

我按如下方式调用函数:

val file1: RDD[GenericRecord] = sc.parquetFile[GenericRecord]("/home/abc.parquet")
sc.saveAsParquetFile(file1, "/home/abc/")

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题