apache-flink:dataset api外部连接中的nullpointerexception

hgc7kmma  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(394)

我试图在flink的datasetapi中实现以下简单查询。

select 
    t1_value1 
from  
    table1 
where  
    t1_suppkey not in ( 
        select  
            t2_suppkey
        from  
            table2
     )

所以我的想法是执行一个left-outer连接(表1.leftouterjoin(表2)…),然后删除所有得到t1_suppkey和t2_suppkey值的行。
所以我试着这样:

output = table1
    .leftOuterJoin(table2).where("t1_suppkey").equalTo("t2_suppkey")
    .with((Table1 t1, Table2 t2) -> new Tuple2<>(t1.ps_suppkey, t2.s_suppkey))
    .returns(new TypeHint <Tuple2<Integer, Integer>>() {});

但是,如果我这样做,它总是会失败“java.lang.nullpointerexception”,我不知道为什么。如果我使用普通连接而不是左外连接,代码可以工作,但这不是我想要的。
我是否需要以不同的方式实现左连接,或者是否有更简单的方法重写DataSetaAPI中的“notin”语句?

tyu7yeag

tyu7yeag1#

dataset api的外部联接调用 JoinFunction 也适用于在内侧找不到连接记录的外部记录。在这种情况下 JoinFunction.join() 方法被调用 null .
因为您使用的是左外连接,所以第二个参数 Table2 t2 可以是 null . 这个 NullPointerException 是由 t2.s_suppkey . 你需要检查一下 t2 == null 只有进入 t2 如果不为空。
您还可以使用 FlatJoinFunction 有一个 Collector 参数和仅发射 t1 如果 t2 == null .
另一个选择是使用flink的批处理sql支持,它支持示例中的查询。

5w9g7ksd

5w9g7ksd2#

output = table1
.leftOuterJoin(table2)
.where("t1_suppkey").equalTo("t2_suppkey") 
.with((Table1 t1, Table2 t2, Collector<Tuple2<Integer, Integer>> c) -> { 
if(t2 == null) {
    c.collect(new Tuple2<>(t1.t1_suppkey, t1.t1_value1)); 
} 
else { 
    //Do nothing. 
}})

相关问题