Kafka KSQL可以用模式中提供的默认值填充流吗?

b4wnujal  于 2022-12-17  发布在  Apache
关注(0)|答案(1)|浏览(105)

我有一个KSQL流,我想用模式中指定的默认值填充它。除了用coalesce语句手动指定它们之外,有什么方法可以做到这一点吗?
前面回答的问题与我面临的问题类似,但它没有解决使用模式中已指定的默认值的主要问题:
Create KSQL stream with default values for a column?
我进行了以下操作(基于Confluent提供的文档:https://docs.confluent.io/platform/current/模式注册表/serdes-develope/serdes-avro.html#模式引用在avro中):
1.创建了一个带有模式的主题t1-a

kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string","default":"no-name"}]}

1.将主题的兼容性设置为FULL(使用schema-registry REST API)
1.使用CLI工具生成主题记录

{"name":"john"}
{"name":"doe"}

1.使用Schema-registry更新了模式

kafka-avro-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --topic t1-a \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"name","type":"string", "default":"no-name"}, {"name":"age","type":"string", "default":"ageless-wonder"}]}

1.使用CLI工具生成的主题记录:

{"name":"jack", "age":"100"}
{"name":"jill", "age":"101"}

1.已启动ksql cli并创建了一个流

CREATE STREAM t1_a WITH (KAFKA_TOPIC='t1-a',VALUE_FORMAT='AVRO');

1.查询记录:

SELECT * FROM t1_a;

现在我得到了记录,但是John和Doe的Age值被列为null(而不是模式中指定的默认值“ageless-wonder”):

NAME    AGE
john    null
doe     null
jack    100
jill    101

我知道我可以在流定义中将这些值合并为默认值,但是有没有办法根据已经提供的模式填充该字段呢?

guykilcj

guykilcj1#

Ksql使用最初与数据一起发送的模式ID(而不是最新的)来反序列化记录并构建每一行,您需要定义第一个具有默认年龄的模式,然后不使用该模式发送。

相关问题