(编辑:为更好地反映意图而进行的轻微编辑,但由于取得了进展而进行的大量编辑。)
一个主题 "t_raw"
给定了多种类型的消息,其中它们都包含一个公共 "type"
密钥:
{"type":"key1","data":{"ts":"2018-11-20 19:20:21.1","a":1,"b":"hello"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:22.2","a":1,"c":11,"d":"goodbye"}}
{"type":"key1","data":{"ts":"2018-11-20 19:20:23.3","a":2,"b":"hello2"}}
{"type":"key2","data":{"ts":"2018-11-20 19:20:24.4","a":3,"c":22,"d":"goodbye2"}}
最终,我需要将其拆分为其他流,在这些流中它们将被切碎/聚合/处理。我希望能够使用 STRUCT
但我目前的努力让我做到了:
create stream raw (type varchar, data varchar) \
with (kafka_topic='t_raw', value_format='JSON');
第一级,那么
create stream key1 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.b') as b \
from raw where type='key1';
create stream key2 with (TIMESTAMP='ts', timestamp_format='yyyy-MM-dd HH:mm:ss.S') as \
select \
extractjsonfield(data, '$.ts') as ts, \
extractjsonfield(data, '$.a') as a, extractjsonfield(data, '$.c') as c, \
extractjsonfield(data, '$.d') as d \
from raw where type='key2';
这似乎是可行的,但最近增加了 STRUCT
,有没有办法用它来代替 extractjsonfield
如上所述?
ksql> select * from key1;
1542741621100 | null | 2018-11-20 19:20:21.1 | 1 | hello
1542741623300 | null | 2018-11-20 19:20:23.3 | 2 | hello2
^CQuery terminated
ksql> select * from key2;
1542741622200 | null | 2018-11-20 19:20:22.2 | 1 | 11 | goodbye
1542741624400 | null | 2018-11-20 19:20:24.4 | 3 | 22 | goodbye2
如果没有 STRUCT
,有没有一个直接的方法来做这与香草Kafka流(副 ksql
,因此是apache kafka streams标签)?
有没有一种更Kafka式的/高效的/优雅的方式来解析这个问题?我不能把它定义为空的 STRUCT<>
```
ksql> CREATE STREAM some_input ( type VARCHAR, data struct<> )
WITH (KAFKA_TOPIC='t1', VALUE_FORMAT='JSON');
line 1:52: extraneous input '<>' expecting {',', ')'}
有一些(不是最近的)关于能够做一些类似的事情的讨论
CREATE STREAM key1 ( a INT, b VARCHAR ) AS
SELECT data->* from some_input where type = 'key1';
仅供参考:上述解决方案在confluent-5.0.0中不起作用,最近的一个补丁修复了这个问题 `extractjsonfield` 错误并启用此解决方案。
实际数据有几种更相似的消息类型。它们都包含 `"type"` 以及 `"data"` 键(没有其他顶级键),而且几乎所有键都有 `"ts"` 嵌套在中的等效时间戳 `"data"` .
1条答案
按热度按时间xpcnnkqh1#
是的,你可以这样做-如果一个列不存在,ksql不介意,你只要得到一个
null
价值观。测试数据设置
将一些测试数据填充到主题中:
将主题转储到ksql控制台进行检查:
为数据源流建模
在上面创建一个流。注意使用
STRUCT
以及所有可能的列的引用:将offset设置为earliest,以便查询整个主题,然后使用ksql访问整个流:
使用
->
访问嵌套元素的运算符:将数据保存在单独的Kafka主题中:
用分离的数据填充目标主题:
新流的架构:
支持每个ksql流的主题: