Flink多源Kafka,kinesis和表环境

t9eec4r0  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(117)

I'm new to Flink and hope someone can help. I have tried to follow Flink tutorials.
We have a requirement where we consume from:

  1. kafka topic.
    When an event arrives on kafka topic we need the json event fields (mobile_acc_id, member_id, mobile_number, valid_from, valid_to) to be stored in an external db (Postgres db)
  2. kinesis stream.
    When an event arrives on kinesis stream we need to look up the mobile_number, on the event, in Postgres DB (from step 1) and extract the "member_id" from db and enrich the incoming kinesis event and sink it to another output stream
    So I set up a Stream and a Table environment like this:
public static StreamExecutionEnvironment initEnv() {
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().setAutoWatermarkInterval(0L); //disables watermark
    return env;
}

public static TableEnvironment initTableEnv() {
    var settings = EnvironmentSettings.newInstance().inStreamingMode().build();
    return TableEnvironment.create(settings);
}

calling process(..) methods with initEnv() will use kinesis as the source!
process(config.getTransformerConfig(), input, sink, deadLetterSink, initEnv());
In the process(..) am also initialising the Table Environment using initTableEnv() hoping that Flink with consume from both sources when I call env.execute(..):

public static void process(TransformerConfig cfg, SourceFunction<String> source, SinkFunction<UsageSummaryWithHeader> sink,
                           SinkFunction<DeadLetterEvent> deadLetterSink, StreamExecutionEnvironment env) throws Exception {
    var events =
            StreamUtils.source(source, env, "kinesis-events", cfg.getInputParallelism());

    collectInSink(transform(cfg, events, deadLetterSink), sink, "kinesis-summary-events", cfg.getOutputParallelism());

    processStreamIntoTable(initTableEnv());

    env.execute("my-flink-event-enricher-svc");
}

private static void processStreamIntoTable(TableEnvironment tableEnv) throws Exception {
    tableEnv.executeSql("CREATE TABLE mobile_accounts (\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "    'connector' = 'kafka',\n" +
            "    'topic'     = 'mobile_accounts',\n" +
            "    'properties.bootstrap.servers' = 'kafka:9092',\n" +
            "    'format'    = 'json'\n" +
            ")");

    tableEnv.executeSql("CREATE TABLE mobile_account\n" +
            "(\n" +
            "    mobile_acc_id VARCHAR(36)              NOT NULL,\n" +
            "    member_id     BIGINT                   NOT NULL,\n" +
            "    mobile_number        VARCHAR(14)              NOT NULL,\n" +
            "    valid_from    TIMESTAMP NOT NULL,\n" +
            "    valid_to      TIMESTAMP NOT NULL \n" +
            ") WITH (\n" +
            "   'connector'  = 'jdbc',\n" +
            "   'url'        = 'jdbc:postgresql://flinkpg:5432/flink-demo',\n" +
            "   'table-name' = 'mobile_account',\n" +
            "   'driver'     = 'org.postgresql.Driver',\n" +
            "   'username'   = 'flink-demo',\n" +
            "   'password'   = 'flink-demo'\n" +
            ")");

    Table mobileAccounts = tableEnv.from("mobile_accounts");

    report(mobileAccounts).executeInsert("mobile_account");
}

public static Table report(Table mobileAccounts) {
    return mobileAccounts.select(
            $("mobile_acc_id"),
            $("member_id"),
            $("mobile_number"),
            $("valid_from"),
            $("valid_to"));
}

What I have noticed on the flink console is that it is only consuming from one Source!
I liked TableEnvironment as not much code is needed to get the items inserted into the DB.
How can we consume from both the sources, Kinesis and TableEnvironment in Flink?
Am I using the right approach?
Is there an alternative to implement my requirements?

wgeznvg7

wgeznvg71#

我假设您能够正确地创建表,那么您可以简单地将JOIN两个名为kafka_streamkinesis_stream的流作为

SELECT l.*, r.something_useful FROM kinesis_stream as l
INNER JOIN kafka_stream as r
ON l.member_id = r.member_id;

如果PostgreSQL sink是必不可少的,您可以在不同的查询中将其设置为

INSERT INTO postgre_sink
SELECT * FROM kafka_stream;

他们将解决您的问题与表API(或Flink SQL).

相关问题