flink:datastream左联接表超级简单

n53p2ov0  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(302)
DataStream<String> sourceStream = streamEnv.fromElements("key_a", "key_b", "key_c", "key_d");

        Table lookupTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("my_key", DataTypes.STRING()),
                        DataTypes.FIELD("my_value", DataTypes.STRING())
                ),
                Expressions.row("key_a", "value_a"),
                Expressions.row("key_b", "value_b")
        );

我想把这条小溪留在table上。
这显然是一个简化的演示场景。在使用更大的生产数据集之前,我想了解如何使用flinkapi通过玩具数据集实现这一点。
“表联接上的文档”演示了如何联接两个表并取回另一个表,这不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableapi.html#joins
datastream joins上的docs显示在一个时间窗口上连接两个流,这也不是我想要的:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/joining.html

yuvru6vn

yuvru6vn1#

我相信这就是你要找的。此示例将sourcestream转换为动态表,将其与查找表联接,然后将生成的动态表转换回流以进行打印。
相反,您可以使用datastreamapi对resultstream进行进一步处理。

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class JoinExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStream<String> sourceStream = env.fromElements("key_a", "key_b", "key_c", "key_d");
        Table streamTable = tableEnv.fromDataStream(sourceStream, $("stream_key"));

        Table lookupTable = tableEnv.fromValues(
                DataTypes.ROW(
                        DataTypes.FIELD("lookup_key", DataTypes.STRING()),
                        DataTypes.FIELD("lookup_value", DataTypes.STRING())
                ),
                Expressions.row("key_a", "value_a"),
                Expressions.row("key_b", "value_b")
        );

        Table resultTable = streamTable
                .join(lookupTable).where($("stream_key").isEqual($("lookup_key")))
                .select($("stream_key"), $("lookup_value"));

        DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);

        resultStream.print();

        env.execute();
    }
}

输出为

key_b,value_b
key_a,value_a

相关问题