在反序列化(加载到配置单元表中)以avro模式支持的Parquet格式编写的小数时发生classcastexception

9wbgstp7  于 2021-06-26  发布在  Hive
关注(0)|答案(1)|浏览(352)

我正在尝试使用avro模式(avro-backed)将csv数据序列化为parquet格式&再次将其读入配置单元表。
正在使用以下示例代码段(用于序列化单个记录的示例代码)成功地序列化此记录:

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MesspidType;

public class AvroParquetConverter {

    public static void main(String[] args) throws IOException {
        Schema avroSchema = new Schema.Parser().parse(new File("schema.avsc"));
        GenericRecord myrecord = new GenericData.Record(avroSchema);
        String outputFilename = "/home/jai/sample1000-snappy.parquet";
        Path outputPath = new Path(outputFilename);
        MesspidType parquetSchema = new AvroSchemaConverter()
                .convert(avroSchema);
        AvroWriteSupport writeSupport = new AvroWriteSupport(parquetSchema,
                avroSchema);
        CompressionCodecName compressionCodecSnappy = CompressionCodecName.SNAPPY;
        int blockSize = 256 * 1024 * 1024;
        int ppidSize = 64 * 1024;

        ParquetWriter parquetWriterSnappy = new ParquetWriter(outputPath,
                writeSupport, compressionCodecSnappy, blockSize, ppidSize);
        BigDecimal bd = new BigDecimal(20);
        GenericRecord myrecordTemp = new GenericData.Record(avroSchema);
        myrecord.put("name", "Abhijeet1");
        myrecord.put("pid", 20);
        myrecord.put("favorite_number", 22);
        String bd1 = "13.5";
        BigDecimal bdecimal = new BigDecimal(bd1);
        bdecimal.setScale(15, 6);
        BigInteger bi = bdecimal.unscaledValue();
        byte[] barray = bi.toByteArray();
        ByteBuffer byteBuffer = ByteBuffer.allocate(barray.length);
        byteBuffer.put(barray);
        byteBuffer.rewind();
        myrecord.put("price", byteBuffer);
        parquetWriterSnappy.write(myrecord);
        parquetWriterSnappy.close();
    }
}

还可以使用以下语句完成十进制到字节缓冲区的转换:

ByteBuffer.wrap(bdecimal.unscaledValue().toByteArray());

下面是avro模式文件

{
    "namespace": "avropoc",
    "type": "record",
    "name": "User",
    "fields": [
             {"name": "name", "type": "string", "default" : "null"},
             {"name": "favorite_number",  "type": "int", "default": 0 },
             {"name": "pid",  "type":"int", "default" : 0 },
             {"name": "price", "type": {"type" : "bytes","logicalType":"decimal","precision":15,"scale":6}, "default" : 0 }
     ]
}

还尝试对架构进行以下修改:

{"name": "price", "type": "bytes","logicalType":"decimal","precision":15,"scale":6, "default" : 0 }

我正在创建配置单元表,如下所示:

create external table avroparquet1
( name string, favorite_number int,
pid int, price DECIMAL(15,6))
STORED AS PARQUET;

但是,当我运行十进制字段价格查询时,会收到以下错误消息:
失败,出现异常java.io.ioexception:org.apache.hadoop.hive.ql.metadata.hiveexception:java.lang.classcastexception:org.apache.hadoop.io.byteswritable不能强制转换为org.apache.hadoop.hive.serde2.io.hivedecimalwritable
这看起来像是parquet/avro/hive相关的问题,它不能反序列化小数,在avro的情况下,小数需要写成bytebuffer。
我已经在avro 1.8.0、parquet 1.8.1和hive 1.1.0上试过了。
任何帮助都将不胜感激。

vulvrdjw

vulvrdjw1#

hive为十进制(22,7)生成的实际模式——使用linkedin实用程序派生的一些代码来检查实际的Parquet文件——看起来像。。。
Parquet语法: optional fixed_len_byte_array(10) my_dec_22_7; avro语法: { "name":"my_dec_22_7","type":["null",{"type":"fixed", "name":"my_dec_22_7","size":10} ], "default":null } …其中10似乎是转储文件所需的字节数 BigInteger 将22位数字转换为 byte[] . 查看avroserdeutils源代码及其转储 HiveDecimal ,例如。
也就是说,我真的不知道如何读/写Parquet文件中的十进制数。double和bigint更易于处理,因为它们有ieee标准类型(以及avro标准类型和parquet标准类型)支持。

相关问题