apache-flink:为datastreamapi添加端输入

axzmvihb  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(347)

在我的java应用程序中,我有三个数据流。例如,对于一个流,数据从kafka消费,对于另一个流,数据从apachenifi消费。对于这两个流,对象类型不同。例如,stream-1对象类型是person,stream-2对象类型是address。
第三个是广播流(因为这些数据来自Kafka)。
现在我想在一个job类中组合stream-1和stream-2,并在task process元素中进行拆分。如何实施?
注:流1为主流,流2为侧输入。主流媒体不断从Kafka那里获取数据。对于侧输入,在应用程序启动时,首先从数据库加载所有表数据,然后在更新表数据时读取新数据(不经常)。
样本结构:

DataStream<Person> stream-1 = env.addSource(read data from kafka)....
DataStream<Address> stream-2 = env.addSource(read data from nifi)....
BroadcastStream<String> BroadCastStream = stream-3.broadcast(read data from kafka);

我被称为以下链接。
数据流api的翻转17侧输入
吉拉/浏览/Flink-6131
我的用例是:
使用缓慢演变的数据连接流:我们用于丰富的边输入随着时间的推移而演变(数据从数据库中读取)。这可以通过在处理主输入之前等待一些初始数据可用来完成,并且在新数据到达时不断地将其摄取到内部侧输入结构中。

u1ehiz5o

u1ehiz5o1#

根据最新的答复,@arvid的建议实际上是这里所需要的。
答案的核心:
即使stream1和stream2的类型不同,您也可以轻松地将它们连接起来。然后可以将广播添加到结果中
指向文档和示例的链接,以及文档中的相关片段(示例太长,无法包含在此处):

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

相关问题