Flink:数据流上没有外部连接?

ncecgwcz  于 2021-06-25  发布在  Flink
关注(0)|答案(2)|浏览(371)

我很惊讶地发现,没有外部连接 DataStream 在flink(datastream docs)中。
为了 DataSet 你有所有的选择: leftOuterJoin , rightOuterJoin 以及 fullOuterJoin ,除了常规的 join (数据集文档)。但对于 DataStream 你只是有个普通的老朋友。
这是因为 DataStream 使得不可能有外部连接?或者也许我们可以期待在(接近?)的未来?
我真的需要一个外部连接 DataStream 对于我正在研究的问题。。。有没有办法达到类似的行为?

jslywgbw

jslywgbw1#

可以使用 DataStream.coGroup() 转变。一 CoGroupFunction 接收两个迭代器(每个输入一个迭代器),为某个键的所有元素提供服务,如果找不到匹配的元素,则该迭代器可能为空。这允许实现外部连接功能。
在flink的下一个版本中,可能会将对外部联接的一流支持添加到datastreamapi中。我目前还不知道有这样的努力。但是,在apache flink jira中创建一个问题可能会有所帮助。

gfttwv5a

gfttwv5a2#

一种方法是从流->表->流出发,使用以下api:flinkTableAPI-outerJoin
下面是一个java示例:

DataStream<String> data = env.readTextFile( ... );
    DataStream<String> data2Merge = env.readTextFile( ... );

    ...

    tableEnv.registerDataStream("myDataLeft", data, "left_column1, left_column2");
    tableEnv.registerDataStream("myDataRight", data2Merge, "right_column1, right_column2");

    String queryLeft = "SELECT left_column1, left_column2 FROM myDataLeft";
    String queryRight = "SELECT right_column1, right_column2 FROM myDataRight";

    Table tableLeft = tableEnv.sqlQuery(queryLeft);
    Table tableRight = tableEnv.sqlQuery(queryRight);

    Table fullOuterResult = tableLeft.fullOuterJoin(tableRight, "left_column1 == right_column1").select("left_column1, left_column2, right_column2");
    DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(fullOuterResult, Row.class);

相关问题