spark avro到parquet在数字字段中写入空值

vjhs03f7  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(423)

我将spark数据框保存为一个Parquet文件,并且该数据框包含从avro对象构建的行。准确的代码在这里-https://stackoverflow.com/a/41491999/2440775
我面临的挑战是,当传入数据中缺少整数字段时,我希望能够使用空值。avro似乎允许通过使用联合类型实现这一点,但是当我在avsc中没有指定默认值或指定默认值“null”时,会出现如下错误:

Caused by: org.apache.avro.AvroRuntimeException: Field xxx type:LONG pos:7 not set and has no default value
    at org.apache.avro.generic.GenericData.getDefaultValue(GenericData.java:984)
    at org.apache.avro.data.RecordBuilderBase.defaultValue(RecordBuilderBase.java:135)

Or

Caused by: org.apache.avro.AvroRuntimeException: Field xxx type:UNION pos:7 not set and has no default value
    at org.apache.avro.generic.GenericData.getDefaultValue(GenericData.java:984)
    at org.apache.avro.data.RecordBuilderBase.defaultValue(RecordBuilderBase.java:135)

如果我写一个默认值“0”,it saveAsparequet工作得很好
我还尝试更改avro规范,使其首先具有“null”类型,因为union选择第一个元素的类型。

"type": ["null","long"], "default": null

这会导致以下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long

更改avro模式中long和null的顺序会导致以下异常
原因:org.apache.avro.avrotypeexception:long的非数字默认值:null

daolsyd0

daolsyd01#

我没有这样的解决方案,但找到了一个解决办法。我从avro对象构建行的方法是从avro对象创建一个列表,然后在该列表上执行row.fromseq。解决方法检查默认值0和数据类型int或long。如果是默认值,则添加null。因此,在选择默认值时必须小心。

public static List avroToList(AvroData a) throws UnsupportedEncodingException{
        List l = new ArrayList<>();
        for (Schema.Field f : a.getSchema().getFields()) {
            Object value = a.get(f.name());
            if (value == null) {
                l.add(null);
            }
            else {
                switch (f.schema().getType().getName()){
                    case "union":
                        l.add(value.toString());
                        break;

                    case "int":
                        if(value == 0) {l.add(null);}
                        else {l.add(Integer.valueOf(value.toString()));}
                      break;

                    case "long":
                        if(value == 0L) {l.add(null);}
                        else {l.add(Long.valueOf(value.toString()));}
                        break;

                    default:l.add(value);
                        break;
                }

            }
        }
        return l;
    }

avsc文件的类型信息如下

"type": "long",  "default": 0

相关问题