如何在Flink SQL中使用ROW类型列中的字段?

hgncfbus  于 2022-12-16  发布在  Apache
关注(0)|答案(1)|浏览(229)

我正在Flink中执行一个SQL,如下所示:

create table team_config_source (
      `payload` ROW(
        `before` ROW(
          team_config_id int,
          ...
        ),
        `after` ROW(
          team_config_id int,
          ...
        )
      ),
      PRIMARY KEY (`payload`.`after`.`team_config_id`) NOT ENFORCED
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'xxx',
    'properties.bootstrap.servers' = 'xxx',
    'properties.group.id' = 'xxx',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'key.format' = 'json'
    )

但Flink给予我这个错误:

org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 51, column 29.
Was expecting one of:
     ")" ...
     "," ...

我也试过用(payload.after.team_config_id)替换(payload.after.team_config_id),但是Flink会说column payload.after.team_config_id was not defined
我应该如何纠正我的DDL?

ar7v8xwq

ar7v8xwq1#

我跳过了这个问题,省略了PRIMARY KEY语句。

相关问题