我正在尝试使用Apache Flink 1.11创建一个源表,以便访问JSON消息中的嵌套属性。我可以从根属性中提取值,但不确定如何访问嵌套对象。
文档建议它应该是MAP
类型,但是当我设置它时,我得到了以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
下面是我的SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的JSON看起来像这样:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
4条答案
按热度按时间v8wbuo2f1#
可以使用
ROW
来提取JSON消息中的嵌套字段。DDL语句如下所示:w51jfk4q2#
[2022年更新]
在Apache Flink 1.13版本中没有系统内置JSON函数。它们在1.14版本中引入。检查this
如果您使用的是版本〈1.14,请参阅下面的解决方案。
如何使用嵌套JSON输入创建表?
JSON输入示例:
创建语句
如何选择嵌套列?
请注意,如果使用输出结果
您将看到行格式的结果。列
properties
的内容将是abithluo3#
你也可以试试
唯一的区别是:
MAP<STRING, STRING>
与MAP
wvmv3b1j4#
如果使用
format=raw
,您可以使用JSON_VALUE
函数从payload
中提取感兴趣的字段:下面是代码: