CREATE TABLE user_log (
data ROW(id String,user_id String,class_id String)
) WITH (
'connector.type' = 'kafka',
...
);
INSERT INTO sink
SELECT * FROM user_log as tab,
LATERAL TABLE(splitUdtf(tab.data)) AS T(a,b,c);
udtf代码:
public void eval(Row data) {...}
eval方法只能传递行类型参数吗?我想在sql中得到行的键,比如id,user\u id,class\u id,但是在java中,行的键是index(比如0,1,2),我该怎么做?谢谢您!
1条答案
按热度按时间sh7euo9m1#
您的sql是否能够直接将kafka数据转换为表行?也许不是。row是数据流级别的类型,而不是tableapi&sql中的类型。
如果您从kafka收到的数据是json格式的,那么您可以使用fllinksql中的ddl语句,或者使用connector api直接提取json中的字段,只要您的json是key-value格式的。