flink sql:udtf传递行类型参数

ctehm74n  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(392)
CREATE TABLE user_log (
    data ROW(id String,user_id String,class_id String)
) WITH (
    'connector.type' = 'kafka',
    ...
);

INSERT INTO sink
SELECT * FROM user_log as tab,
LATERAL TABLE(splitUdtf(tab.data)) AS T(a,b,c);

udtf代码:

public void eval(Row data) {...}

eval方法只能传递行类型参数吗?我想在sql中得到行的键,比如id,user\u id,class\u id,但是在java中,行的键是index(比如0,1,2),我该怎么做?谢谢您!

sh7euo9m

sh7euo9m1#

您的sql是否能够直接将kafka数据转换为表行?也许不是。row是数据流级别的类型,而不是tableapi&sql中的类型。
如果您从kafka收到的数据是json格式的,那么您可以使用fllinksql中的ddl语句,或者使用connector api直接提取json中的字段,只要您的json是key-value格式的。

相关问题