使用Apache Flink SQL从Kafka消息中获取嵌套字段

balp4ylt  于 2023-02-27  发布在  Apache
关注(0)|答案(4)|浏览(298)

我正在尝试使用Apache Flink 1.11创建一个源表,以便访问JSON消息中的嵌套属性。我可以从根属性中提取值,但不确定如何访问嵌套对象。
文档建议它应该是MAP类型,但是当我设置它时,我得到了以下错误

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

下面是我的SQL

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

我的JSON看起来像这样:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}
v8wbuo2f

v8wbuo2f1#

可以使用ROW来提取JSON消息中的嵌套字段。DDL语句如下所示:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );
w51jfk4q

w51jfk4q2#

[2022年更新]

在Apache Flink 1.13版本中没有系统内置JSON函数。它们在1.14版本中引入。检查this
如果您使用的是版本〈1.14,请参阅下面的解决方案。

如何使用嵌套JSON输入创建表?

JSON输入示例:

{
    "id": "message-1",
    "title": "Some Title",
    "properties": {
    "foo": "bar"
    "nested_foo":{
        "prop1" : "value1",
        "prop2" : "value2"
    }
}

创建语句

CREATE TABLE input(
                id VARCHAR,
                title VARCHAR,
                properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

如何选择嵌套列?

SELECT properties.foo, properties.nested_foo.prop1 FROM input;

请注意,如果使用输出结果

SELECT properties FROM input

您将看到行格式的结果。列properties的内容将是

+I[bar, +I[prop1,prop2]]
abithluo

abithluo3#

你也可以试试

CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP<STRING, STRING>
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

唯一的区别是:MAP<STRING, STRING>MAP

wvmv3b1j

wvmv3b1j4#

如果使用format=raw,您可以使用JSON_VALUE函数从payload中提取感兴趣的字段:下面是代码:

CREATE TABLE input(
        payload STRING,
        foo AS JSON_VALUE(payload, '$.properties.foo' RETURNING STRING),
) WITH (
  'connector' = 'kafka',        
  'format' = 'raw'
)

相关问题