为什么kafka avro console producer不遵守字段的默认值?

jbose2ul  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(464)

虽然为字段定义了默认值, kafka-avro-console-producer 完全忽略它:

$ kafka-avro-console-producer --broker-list localhost:9092 --topic test-avro \
--property schema.registry.url=http://localhost:8081 --property \
value.schema='{"type":"record","name":"myrecord1","fields": \
[{"name":"f1","type":"string"},{"name": "f2", "type": "int", "default": 0}]}'

{"f1": "value1"}

org.apache.kafka.common.errors.SerializationException: Error 
deserializing json {"f1": "value1"} to Avro of schema 
{"type":"record","name":"myrecord1","fields": 
[{"name":"f1","type":"string"},{"name":"f2","type":"int","default":0}]}
Caused by: org.apache.avro.AvroTypeException: Expected int. Got END_OBJECT
    at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:698)
    at org.apache.avro.io.JsonDecoder.readInt(JsonDecoder.java:172)
    at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:83)
    at org.apache.avro.generic.GenericDatumReader.readInt(GenericDatumReader.java:511)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:182)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at io.confluent.kafka.formatter.AvroMessageReader.jsonToAvro(AvroMessageReader.java:213)
    at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:180)
    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:54)
    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

那么如何使用它来接受默认值呢?顶层配置设置为 "BACKWARD" 兼容性级别检查,尽管我不认为这与问题有关。这个模式是版本2,版本1只定义了f1字段,但正如我所说的,我认为这无关紧要。

cbeh67ev

cbeh67ev1#

如avro规范所述 default :此字段的默认值,在读取缺少此字段的示例时使用
因此,生产商仍然需要供应该油田。
我不确定在使用avro控制台producer时是否可以完全排除字段,因为即使将字段设置为giorgos所示的可空字段,仍然需要显式地设置它。

wvt8vs2t

wvt8vs2t2#

该错误指示消息与您定义的avro架构不兼容。据我所知,你想允许 null 字段的值 f2 . 要做到这一点,你需要改变你的想法 value.schema 注意…的定义 "type" ):

value.schema='{"type":"record","name":"myrecord1","fields": [{"name":"f1","type":"string"},{"name": "f2", "type": ["null", "int"], "default": 0}]}'

但你仍然需要定义 f2 具有空值的键。以下内容可以帮您解决问题:

kafka-avro-console-producer --broker-list localhost:9092 --topic test-avro \ 
    --property schema.registry.url=http://localhost:8081 \ 
    --property value.schema='{"type":"record","name":"myrecord1","fields": [{"name":"f1","type":"string"},{"name": "f2", "type": ["null", "int"], "default": 0}]}'

{"f1":"value1","f2":null}

你可以确认这一点 kafka-avro-console-consumer :

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic test-avro --from-beginning
{"f1":"value1","f2":null}
^CProcessed a total of 1 messages

相关问题