我有一个主题,我正在发送json,格式如下:
{
"schema": {
"type": "string",
"optional": true
},
"payload": “CustomerData{version='1', customerId=‘76813432’, phone=‘76813432’}”
}
我想用customerid和phone创建一个流,但我不知道如何用嵌套的json对象定义流((已编辑)
CREATE STREAM customer (
payload.version VARCHAR,
payload.customerId VARCHAR,
payload.phone VARCHAR
) WITH (
KAFKA_TOPIC='customers',
VALUE_FORMAT='JSON'
);
会是那样吗?如何在定义streams字段时取消引用嵌套对象?
实际上,上面所说的不适用于字段定义:
Caused by: line 2:12:
extraneous input '.' expecting {'EMIT', 'CHANGES',
'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY',
1条答案
按热度按时间oyjwcjzk1#
应用函数extractjsonfield
您可以使用一个名为extractjsonfield的ksqldb函数。
首先,需要提取schema和payload字段:
然后可以选择json中的嵌套字段:
但是,您的负载数据似乎没有有效的json格式。
应用结构模式
如果您的整个负载被编码为json字符串,这意味着您的数据如下所示:
您可以定义struct如下:
最后,引用单个字段可以这样做: