用schema序列化avro数据时出现异常

whlutmcx  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(218)

我已经在schmema注册中心注册了名为physics的主题下的模式。

curl -X POST \
  http://localhost:8081/subjects/physics/versions \
  -H 'accept: application/vnd.schemaregistry.v1+json' \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/vnd.schemaregistry.v1+json' \
  -H 'postman-token: 5ce4bab3-6d0c-2940-d7dd-8f901b0b6fec' \
  -d '{"schema":"{\"type\":\"record\",\"name\":\"physics\",\"fields\":[{\"name\":\"ATTRIBUTE1\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE2\",\"type\":\"long\",\"default\":0},{\"name\":\"ATTRIBUTE3\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE4\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"ATTRIBUTE5\",\"type\":\"string\",\"default\":\"\"}]}"}'

我的Kafka制作人代码与此类似。

SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://localhost:8081", 20);
        Schema schema = schemaRegistryClient.getBySubjectAndId("physics", ID);

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(kafkaProps);
        GenericRecord inputRecord = new GenericData.Record(schema);
        inputRecord.put("ATTRIBUTE1", "val1");
        inputRecord.put("ATTRIBUTE2", System.currentTimeMillis());
        inputRecord.put("ATTRIBUTE3", "val3");

        ProducerRecord<String, GenericRecord> recordData = new ProducerRecord<String, GenericRecord>(topic,
                subjectName, inputRecord);
        producer.send(recordData).get();

兼容性:向后序列化程序:kafkaavroserializer
在运行Kafka制片人我得到以下例外。

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException: null of string in field ATTRIBUTE4 of physics
    at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:145)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:139)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:92)
    at kafka.MyKafkaAvroSerializer.serialize(MyKafkaAvroSerializer.java:27)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:453)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
    at kafka.MyAvroProducerV3.main(MyAvroProducerV3.java:62)
Caused by: java.lang.NullPointerException
    at org.apache.avro.io.Encoder.writeString(Encoder.java:121)
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:267)
    at org.apache.avro.generic.GenericDatumWriter.writeString(GenericDatumWriter.java:262)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:128)

如果我发送额外的属性viz。来自kafka producer的attribute6则抛出异常,表示attribute6未注册。这很好。
因为我已经注册了5个属性(attribute1,attribute2,attribute3,attribute4,attribute5),如果我只发送3个属性(即。attribute1,attribute2,attribute3)消费者应该只接收这3个属性,以及缺少的属性的默认值(例如-1)。但为什么我会得到以上例外?
我错过什么了吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题