如何从嵌套的json对象创建ksqldb流字段

jk9hmnmh  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(459)

我有一个主题,我正在发送json,格式如下:

{
  "schema": {
   "type": "string",
   "optional": true
  },
  "payload": “CustomerData{version='1', customerId=‘76813432’,      phone=‘76813432’}”
}

我想用customerid和phone创建一个流,但我不知道如何用嵌套的json对象定义流((已编辑)

CREATE  STREAM customer (
    payload.version VARCHAR,
    payload.customerId VARCHAR,
    payload.phone VARCHAR
  ) WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
  );

会是那样吗?如何在定义streams字段时取消引用嵌套对象?
实际上,上面所说的不适用于字段定义:

Caused by: line 2:12: 
extraneous input '.' expecting {'EMIT', 'CHANGES',
'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY',
oyjwcjzk

oyjwcjzk1#

应用函数extractjsonfield

您可以使用一个名为extractjsonfield的ksqldb函数。
首先,需要提取schema和payload字段:

CREATE STREAM customer (
  schema VARCHAR,
  payload VARCHAR
) WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
);

然后可以选择json中的嵌套字段:

SELECT EXTRACTJSONFIELD(payload, '$.version') AS version FROM customer;

但是,您的负载数据似乎没有有效的json格式。

应用结构模式

如果您的整个负载被编码为json字符串,这意味着您的数据如下所示:

{
  "schema": {
   "type": "string",
   "optional": true
  },
  "payload": {
    "version"="1",
    "customerId"="76813432",
    "phone"="76813432"
  }
}

您可以定义struct如下:

CREATE STREAM customer (
  schema STRUCT<
    type VARCHAR,
    optional BOOLEAN>,
  payload STRUCT<
    version VARCHAR,
    customerId VARCHAR,
    phone VARCHAR>
) 
WITH (
    KAFKA_TOPIC='customers',
    VALUE_FORMAT='JSON'
);

最后,引用单个字段可以这样做:

CREATE STREAM customer_analysis AS
SELECT
  payload->version as VERSION,
  payload->customerId as CUSTOMER_ID,
  payload->phone as PHONE
FROM customer
EMIT CHANGES;

相关问题