我正在Flink中执行一个SQL,如下所示:
create table team_config_source (
`payload` ROW(
`before` ROW(
team_config_id int,
...
),
`after` ROW(
team_config_id int,
...
)
),
PRIMARY KEY (`payload`.`after`.`team_config_id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'key.format' = 'json'
)
但Flink给予我这个错误:
org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "." at line 51, column 29.
Was expecting one of:
")" ...
"," ...
我也试过用(
payload.after.team_config_id)
替换(
payload.
after.
team_config_id)
,但是Flink会说column payload.after.team_config_id was not defined
。
我应该如何纠正我的DDL?
1条答案
按热度按时间ar7v8xwq1#
我跳过了这个问题,省略了
PRIMARY KEY
语句。