ksql表或流中包含部分原始json消息的字符串字段

vqlkdk9b  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(364)

是否可以将字符串字段添加到ksql表/流中,该表/流将包含原始消息的json的一部分。
例如,
原始消息:

{userId:12345, 
 service:"service-1", 
 "debug":{
          "msg":"Debug message", 
          "timer": 11.12}
}

所以,我们需要绘制Map userIduserId BIGINT , serviceservice STRING 以及 debugdebug STRING 它将包含 {"msg":"Debug message", "timer": 11.12} 作为字符串。

zpgglvta

zpgglvta1#

是的,你可以简单地将它声明为 VARCHAR . 在这里,您可以将其视为恰好是json的字符串,也可以使用 EXTRACTJSONFIELD 功能。
将示例消息发送到主题:

echo '{"userId":12345, "service":"service-1", "debug":{ "msg":"Debug message", "timer": 11.12} }' | kafkacat -b localhost:9092 -t test_topic -P

声明流:

ksql> CREATE STREAM demo (userid BIGINT, service VARCHAR, debug VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

查询列:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT USERID, SERVICE, DEBUG FROM demo;
12345 | service-1 | {"msg":"Debug message","timer":11.12}

访问嵌套的json字段:

ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.msg') FROM demo;
12345 | service-1 | Debug message

ksql> SELECT USERID, SERVICE, EXTRACTJSONFIELD(DEBUG,'$.timer') FROM demo;
12345 | service-1 | 11.12

相关问题