我有一个KSQL流,我想用模式中指定的默认值填充它。除了用coalesce语句手动指定它们之外,有什么方法可以做到这一点吗?
前面回答的问题与我面临的问题类似,但它没有解决使用模式中已指定的默认值的主要问题:
Create KSQL stream with default values for a column?
我进行了以下操作(基于Confluent提供的文档:https://docs.confluent.io/platform/current/模式注册表/serdes-develope/serdes-avro.html#模式引用在avro中):
1.创建了一个带有模式的主题t1-a
kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string","default":"no-name"}]}
1.将主题的兼容性设置为FULL(使用schema-registry REST API)
1.使用CLI工具生成主题记录
{"name":"john"}
{"name":"doe"}
1.使用Schema-registry更新了模式
kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string", "default":"no-name"}, {"name":"age","type":"string", "default":"ageless-wonder"}]}
1.使用CLI工具生成的主题记录:
{"name":"jack", "age":"100"}
{"name":"jill", "age":"101"}
1.已启动ksql cli并创建了一个流
CREATE STREAM t1_a WITH (KAFKA_TOPIC='t1-a',VALUE_FORMAT='AVRO');
1.查询记录:
SELECT * FROM t1_a;
现在我得到了记录,但是John和Doe的Age值被列为null(而不是模式中指定的默认值“ageless-wonder”):
NAME AGE
john null
doe null
jack 100
jill 101
我知道我可以在流定义中将这些值合并为默认值,但是有没有办法根据已经提供的模式填充该字段呢?
1条答案
按热度按时间guykilcj1#
Ksql使用最初与数据一起发送的模式ID(而不是最新的)来反序列化记录并构建每一行,您需要定义第一个具有默认年龄的模式,然后不使用该模式发送。