使用ksqlcli,使用kafka主题,其消息是json对象。我希望在不声明详尽的结构或Map字段定义的情况下提取obj.updaterid等字段。
{
"id": "5ba8f7e6b93c7964efb00f48",
"source": "ShareService",
"obj": {
"updaterId": "systems@test.com",
"desc": "foobar",
"name": "com.test.auto.sensor"
}
}
我可以通过多种方式成功创建流,最简单的方法是:
CREATE STREAM objs1 (obj VARCHAR) WITH (kafka_topic='json-topic', value_format='JSON');
简单的选择工作如预期,您可以看到的内容对象。。。
SELECT * FROM objs1;
1537804190394 |“5ba8f7e6b93c7964efb00f48”|{name=com.test.auto.sensor,updaterid=systems@test.com,desc=foobar}
这里不起作用的是使用extractjsonfield从obj中提取json字段的任何尝试。对顶级和嵌套对象的响应都是“null”。
SELECT EXTRACTJSONFIELD(obj, '$.updaterId') AS updater FROM objs1;
无效的
ksql文档中有一个注解,说明如果数据是字符串列中的实际对象,那么我可以使用struct。它并没有说我必须使用struct。
顺便说一句,使用struct确实有效,但是我对extractjsonfield感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,则有时会出现空响应。
作品:
CREATE STREAM objs1 (obj STRUCT<updaterId VARCHAR>) WITH (kafka_topic='json-topic', value_format='JSON');
SELECT OBJ->updaterId AS updater FROM OBJS1;
我发誓我在别人的问题中看到了其他类似的例子。我错过了什么?
注意:我已经为这篇文章简化了json。它是更大,更嵌套的irl,但这个简单的例子是准确的,我相信。
osx上的ksql 5.0.0版。
2条答案
按热度按时间ve7v8dk21#
ksql问题#1562已经解决,您应该能够使用
EXTRACTJSONFIELD
像以前一样工作。请注意,您现在需要使用主版本的最新版本才能拥有此功能。ezykj2lf2#
在最新版本中,json解析器ksql issuesŠ1562中似乎有一个实质性的变化,它去掉了varchar列中json内容的引号(从我的示例中可以看到),这导致json解析器找不到该字段名。
这个问题建议使用struct而不是extractjsonfield(就像我上面的工作示例一样)。
这似乎不适合我的用例,因为我的深域名称可能会改变。将对此进行更多的研究和更新。