Flink 如何将未绑定的TableResult发送到Kafka接收器?

b5lpy0ml  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(104)

我正在使用表API创建两个流,我们将其命名为A和B。我正在使用executeSql连接这两个表。输出的形式是TableResult。我想将连接结果发送到Kafka Sink。请查看下面的代码。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String ddlUser = "CREATE TABLE UserTable (\n" +
        "id BIGINT,\n" +
        "name STRING\n" +
        ") WITH (\n" +
        "'connector' = 'kafka',\n" +
        "'topic' = 'USER',\n" +
        "'properties.bootstrap.servers' = 'pkc:9092',\n" +
        "'properties.group.id' = 'testGroup',\n" +
        "'scan.startup.mode' = 'earliest-offset',\n" +
        "'format' = 'json',\n" +
        "'properties.security.protocol' = 'SASL_SSL',\n" +
        "'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
        "'properties.sasl.mechanism' = 'PLAIN'\n" +
        ")";
tEnv.executeSql(ddlUser);
String ddlPurchase = "CREATE TABLE PurchaseTable (\n" +
        "transactionId BIGINT,\n" +
        "userID BIGINT,\n" +
        "item STRING\n" +
        ") WITH (\n" +
        "'connector' = 'kafka',\n" +
        "'topic' = 'PURCHASE',\n" +
        "'properties.bootstrap.servers' = 'pkc:9092',\n" +
        "'properties.group.id' = 'purchaseGroup',\n" +
        "'scan.startup.mode' = 'earliest-offset',\n" +
        "'format' = 'json',\n" +
        "'properties.security.protocol' = 'SASL_SSL',\n" +
        "'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";',\n" +
        "'properties.sasl.mechanism' = 'PLAIN'\n" +
        ")";
tEnv.executeSql(ddlPurchase);
String useQuery = "SELECT * FROM UserTable";
String purchaseQuery = "SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id";
TableResult joinedData = tEnv.executeSql(purchaseQuery);

如何将未绑定的TableResult发送到Kafka接收器?

2jcobegt

2jcobegt1#

You need to insert into a destination table that is also backed by the kafka connector: https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/common/#emit-a-table
In the example they create a temporary table, but as you have already done, you can create a table with the Kafka connector https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kafka/ and have the stream inserted into it (haven't tested but should be something like this):

tEnv.sqlQuery(purchaseQuery).insertInto('DestinationTable')

or

tEnv.executeSql('INSERT INTO DestinationTable SELECT * FROM PurchaseTable JOIN UserTable ON PurchaseTable.userID = UserTable.id')

相关问题