flink版本:1.3.1
我创建了两个表,一个来自内存,另一个来自udtf。当我测试join和left join时,它们返回相同的结果。我所期望的是left join比join有更多的行。
我的测试代码是:
public class ExerciseUDF {
public static void main(String[] args) throws Exception {
test_3();
}
public static void test_3() throws Exception {
// 1. set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
// 2. register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table;
DataSet<WC> result;
DataSet<WCUpper> resultUpper;
table = tEnv.scan("WordCount");
// 3. table left join user defined table
System.out.println("table left join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello
// 4. table join user defined table
System.out.println("table join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.scan("WordCount");
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
);
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print();
}
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
// user defined table function
public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
public void eval(String str){ // hello --> hello HELLO
System.out.println("upper func executed for "+str);
if(str.equals("Hello")){
return;
}
collect(new Tuple2<String,String>(str,str.toUpperCase()));
// collect(new Tuple2<String,String>(str,str.toUpperCase()));
}
}
}
left join和join查询的输出是相同的。在这两种情况下,只返回一行。
WC上一个ciao 1个ciao
但是,我认为left join查询应该保留“hello”行。
1条答案
按热度按时间soat7uwm1#
是的,你说得对。
这是tablefunction外部连接与 predicate 的转换中的一个错误,需要修复。
谢谢,费边