org.apache.spark.sql.analysisexception:没有这样的结构字段

nx7onnlm  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(720)

我正在使用javaspark读取这样一个Parquet文件

Dataset<MyData> myDataDS = sparkSession.read().parquet(myParquetFile)
                        .as(Encoders.bean(MyData.class));

如果 myParquetFile 模式与类完全一致 MyData 但是,假设我添加了一个新字段,例如。 myId (即使它的值为空)到 MyData 类,然后我需要重新生成parquet文件,否则它将引发如下异常
原因:org.apache.spark.sql.analysisexception:没有这样的结构字段myid
有没有一种方法可以跳过空值以通过此错误而不重新生成Parquet文件?

vmpqdwk3

vmpqdwk31#

用暴力的方法来解决这个问题-

Dataset<Row> parquet = spark.read()
                .parquet(
                        getClass().getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
                                ".parquet").getPath()
                );
        parquet.show(false);
        /**
         * +------+
         * |price |
         * +------+
         * |123.15|
         * +------+
         */

        StructType schema = Encoders.bean(MyData.class).schema();
        List<String> columns = Arrays.stream(parquet.columns()).collect(Collectors.toList());
        List<Column> columnList = JavaConverters.asJavaCollectionConverter(schema).asJavaCollection().stream()
                .map(f -> (columns.contains(f.name())) ? col(f.name()) : lit(null).cast(f.dataType()).as(f.name()))
                .collect(Collectors.toList());
        Dataset<MyData> myDataDS =
                parquet.select(JavaConverters.asScalaBufferConverter(columnList).asScala()).as(Encoders.bean(MyData.class));
        myDataDS.show(false);
        myDataDS.printSchema();
        /**
         * +----+------+
         * |myId|price |
         * +----+------+
         * |null|123.15|
         * +----+------+
         *
         * root
         *  |-- myId: string (nullable = true)
         *  |-- price: decimal(5,2) (nullable = true)
         */

我的数据.java

public class MyData {
    private double price;
    private String myId;

    public String getMyId() {
        return myId;
    }

    public void setMyId(String myId) {
        this.myId = myId;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }
}
p5fdfcr1

p5fdfcr12#

在读取parquet时,默认情况下,spark使用parquet文件中包含的模式来读取数据。例如,与avro格式相反,模式在parquet文件中,如果要更改模式,则必须重新生成parquet文件。
但是,您可以向spark提供模式,而不是让spark推断模式 DataFrameReader 使用方法 .schema() . 在这种情况下,spark将忽略parquet文件中定义的模式,并使用您提供的模式。
因此,解决方案是将从casting类中提取的模式传递给spark的 DataFrameReader :

Dataset<MyData> myDataDS = sparkSession.read()
    .schema(Encoders.bean(MyData.class).schema())
    .parquet(myParquetFile)
    .as(Encoders.bean(MyData.class))

这个 AnalysisException 如果不抛出,则会得到一个列“myid”设置为null的数据集。

相关问题