flink基于另一个流确定地过滤流

vc6uscn9  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(561)

我在flink中有两个数据流(带有公共时间戳和kafka),其中一个包含一些信号值,另一个包含活动(简单的active-inactive)信息。我试过了 RichCoProcessFunction 一个简单的状态 private ValueState<Boolean> seen; 结果是不确定的。如果我使用 startFromEarliest 我有时会过滤不同的值。我怎样才能使它具有确定性?我在分享我的 KeyedCoProcessFunction 下面是骷髅。

private ValueState < Boolean > seen;

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor < Boolean > descriptor = new ValueStateDescriptor < > (
        // state name
        "have-seen-key",
        // type information of state
        TypeInformation.of(new TypeHint < Boolean > () {}));
    seen = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (seen.value() == Boolean.TRUE) {
        out.collect(value);
    }
}

@Override
public void processElement2(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (value.value == 1) {
        seen.update(Boolean.TRUE);

    } else {
        seen.update(Boolean.FALSE);
    }

}
q3aa0525

q3aa05251#

实现所需类型的事件时间连接可以作为 RichCoProcessFunction ,但可能有点复杂。您可能更愿意将其实现为带有时态表函数的联接。

weylhg0b

weylhg0b2#

不确定的原因是这两个源以不同的速度生成元素。使其更具确定性的最简单方法是使用eventtime。这意味着您需要同时为控制记录和数据记录分配时间戳。flink将为元素发射水印。
然后您可以简单地缓冲并等待发射或丢弃元素,直到您接收到控制流的水印,这意味着控制流中没有任何变化。
如果没有时间戳,在这种情况下几乎不可能引入确定性行为,因为您永远无法准确地判断给定记录何时到达,哪些记录应该删除,哪些记录应该发出。

相关问题