在flink job中通过两条流与主流进行操作

k4aesqcs  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(426)

目前在flink job中,我有两个流,一个是kafka主题每分钟更新一次的主数据流,另一个是keyedbroadcastprocessfunction的进程元素函数中使用的流(broadcast stream),用于对主流数据进行一些计算。
现在我有一个新的需求,添加一个与其他两个流在结构上完全不同的流。
1) 如何传递必须在flink状态下可用的第三个流,以便与主数据和广播状态数据一起进行计算?在keyedbroadcastprocess函数中?
2) 我们能有两个广播流作为主数据吗?
3) 连接将不起作用,因为流数据是完全不同的数据、广播和第三数据流没有更频繁地改变。它就像一个主数据,这是在计算中使用的主要数据流无法找到任何解决方案,但请帮助。请分享一些我可以参考的链接。

iq3niunx

iq3niunx1#

flink不提供任何类型的具有三个输入的过程函数。
您可以将两个广播流合并在一起(在广播它们之前)。我很欣赏他们是非常不同的类型,但你总能找到一些方法让他们共存。如果没有更自然的方法来统一这两种类型,那么您可以使用这两种方法中的任何一种。要将两个不同的类型合并到一个流中,可以执行以下操作:

DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);

DataStream<Either<String, Integer>> stringsOnTheLeft = strings
        .map(new MapFunction<String, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(String s) throws Exception {
                return Either.Left(s);
            }
        });

DataStream<Either<String, Integer>> intsOnTheRight = ints
        .map(new MapFunction<Integer, Either<String, Integer>>() {
            @Override
            public Either<String, Integer> map(Integer i) throws Exception {
                return Either.Right(i);
            }
        });

DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);

或者,如果可以在不同的阶段将广播流应用于主流,则可以有两个KeyedBroadcastProcessFunction的序列,其中一个函数的输出馈送到另一个函数:

events
    .keyBy(x -> x.foo)
    .connect(broadcast1)
    .process(new process1())
    .keyBy(x -> x.foo)
    .connect(broadcast2)
    .process(new process2())

更新:
如果我们像这样合并并广播,如果任何更新到任何流将更新广播状态,或者它将在广播状态中创建一个新条目?
那完全在你的控制之下。广播状态总是Map状态;我想你会选择某种简单的键来工作,所以你会有 MapState<String, Either<T1, T2>> . map state的工作方式与任何hashmap类似:如果重用一个键,它将替换条目;如果引入一个新键,它将创建一个新条目。
... 如何向[广播]流提供这些流的公共密钥?
钥匙不一定是一样的,只要是同一种类型就行了。

相关问题