来自ververica flink培训材料的过期状态解决方案之谜

ru9i0ody  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(471)

不幸的是,ververica原来的培训被修改了,重定向到了另一个页面,导致我无法再次回顾这个例子的介绍,我确实找到了一些其他的例子,但是对于这个具体的例子,我没有找到它,这里有一些我最近一直在努力的东西,
对于核心部分代码片段,它如下所示:第一种处理骑乘流的方法

@Override
        public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            TimerService service = context.timerService();
            System.out.println("ride time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
            System.out.println("ride state ===> " + fare);
            if (fare != null) {
                System.out.println("fare is not null ===>" + fare.rideId);
                fareState.clear();
                context.timerService().deleteEventTimeTimer(fare.getEventTime());
                out.collect(new Tuple2(ride, fare));
            } else {
                System.out.println("update ride state ===> " + ride.rideId + "===>" + context.timestamp());
                rideState.update(ride);
                System.out.println(rideState.value());
                // as soon as the watermark arrives, we can stop waiting for the corresponding fare
                context.timerService().registerEventTimeTimer(ride.getEventTime());
            }
        }

票价流的第二种处理方法

@Override
        public void processElement2(TaxiFare fare, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TimerService service = context.timerService();
            System.out.println("fare time service current watermark ===> " + service.currentWatermark() + "; timestamp ===>" + context.timestamp());
            TaxiRide ride = rideState.value();
            System.out.println("fare state ===> " + ride);
            if (ride != null) {
                System.out.println("ride is not null ===> " + ride.rideId);
                rideState.clear();
                context.timerService().deleteEventTimeTimer(ride.getEventTime());
                out.collect(new Tuple2(ride, fare));
            } else {
                System.out.println("update fare state ===> " + fare.rideId + "===>" + context.timestamp());
                fareState.update(fare);
                System.out.println(fareState.value() + "===>" + fareState.value().getEventTime());
                // as soon as the watermark arrives, we can stop waiting for the corresponding ride
                context.timerService().registerEventTimeTimer(fare.getEventTime());
            }
        }

processelement1显然是用于出租车流的,2是用于出租车费的,第一件事是它在执行processelement1之前会运行processelement2一段时间,直到现在我才找到原因,这里是打印部分

fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
fare state ===> null
update fare state ===> 26===>1356998400000
update fare state ===> 58===>1356998400000
58,2013000058,2013000058,2013-01-01 00:00:00,CRD,2.0,0.0,27.0===>1356998400000
26,2013000026,2013000026,2013-01-01 00:00:00,CRD,2.0,0.0,12.5===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 47===>1356998400000
9,2013000009,2013000009,2013-01-01 00:00:00,CRD,1.0,0.0,6.0===>1356998400000
47,2013000047,2013000047,2013-01-01 00:00:00,CRD,0.9,0.0,5.9===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
fare state ===> null
update fare state ===> 54===>1356998400000
fare time service current watermark ===> -9223372036854775808; timestamp ===>1356998400000
54,2013000054,2013000054,2013-01-01 00:00:00,CSH,0.0,0.0,31.0===>1356998400000

第二个原因是,因为valuestate是一个值而不是一个包含很多值的列表,所以每次调用processelemnt2时,如果ride为null,它会转到else,在调用farestate.update()之后,它会改变valuestate的值,从我的Angular 看,这意味着它认为valuestate的前一个值是match,对吧-----最大的难题thx的回答,我真的很感谢你的帮助!

mrzz3bfm

mrzz3bfm1#

有关状态和连接流的新教程可以帮助您解答问题。但简而言之:
你无法控制 processElement1 以及 processElement2 将调用回调。这两个输入流正在相互竞争,而flink运行时将在使用一个流或另一个流中的事件方面做它想做的事情。在计时和/或排序很重要的情况下,您可能会发现有必要以托管flink状态缓冲事件,直到您的应用程序准备好处理它们为止。
valuestate是一种键控状态,这意味着无论何时访问或更新状态,都会读取或写入状态后端中上下文中键的条目。“上下文中的键”是正在处理的流元素的键(对于 processElement 回调),或用于创建计时器的键(对于 onTimer 回调)。
另外,请记住,在这个练习中,每个钥匙最多有一个出租车和一个出租车费。
本练习的参考解决方案说明了一种思考如何管理可能会泄漏的状态的方法,但这种情况下没有一个明显正确的答案。这个练习的目的是激发一些关于如何使用状态和计时器的思考,并将一些涉及到的问题展现出来。
好的解决方案的目标是什么?应该的
产生正确的结果
非泄漏状态
容易理解
有良好的表现
现在,让我们在考虑这些目标的情况下检查建议的解决方案。我们在 processElement1 (顺便说一下, processElement2 是一样的,只是角色在乘车和乘车之间颠倒了):

public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
    TaxiFare fare = fareState.value();
    if (fare != null) {
        fareState.clear();
        context.timerService().deleteEventTimeTimer(fare.getEventTime());
        out.collect(new Tuple2(ride, fare));
    } else {
        rideState.update(ride);
        // as soon as the watermark arrives, we can stop waiting for the corresponding fare
        context.timerService().registerEventTimeTimer(ride.getEventTime());
    }
}

这意味着
每当到达一个没有完成一对的事件时,我们将其存储在状态中并创建一个计时器
每当一个完成配对的事件到达时,我们清除状态并删除匹配事件的计时器(之前存储的)
所以很明显,如果这两个事件都发生了,什么也不会泄露。但是如果一个不见了呢?
在这种情况下,计时器将在某个点触发,并运行此代码,这将清除可能存在的任何状态:

public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
    if (fareState.value() != null) {
        ctx.output(unmatchedFares, fareState.value());
        fareState.clear();
    }
    if (rideState.value() != null) {
        ctx.output(unmatchedRides, rideState.value());
        rideState.clear();
    }
}

好吧,但是我们怎么决定要等多久呢?等一等就够了吗 ride.getEventTime() ?
设置事件时间计时器的效果 ride.getEventTime() 是要等到车流和车费流中的任何混乱问题得到解决。所有早期的乘车和票价活动将在水印到达时到达 ride.getEventTime() ,假设水印是完美的。
在这些练习中,水印实际上是完美的——不可能有延迟事件。但是在现实环境中,您应该预料到一些后期事件,并且我们应该预料到我们的实现在这种情况下的行为是正确的。此参考溶液的作用是:
匹配对中的一个事件将首先到达,并创建一个计时器来安排其最终删除
计时器将启动,事件将被清除
匹配的事件延迟到达,并创建另一个计时器,在本例中是针对已经过去的时间
下一个到达的水印触发计时器,并且状态被清除
换句话说,当事件延迟时,不会泄漏任何状态,但不会生成结果连接。因此,如果您希望在数据延迟到达的情况下仍能生成结果,则应该创建计时器,通过将必要的状态保留一段时间来适应某些延迟,例如。,

context.timerService().registerEventTimeTimer(ride.getEventTime() + ALLOWED_LATENESS);

尝试容纳任意的延迟事件不是一个好主意,因为这样做需要无限期地为每个延迟事件保留一些状态。
不如改用处理时间计时器?
当然,这是可行的,但测试起来可能会更尴尬。
为什么不用状态时间来代替生活呢?
好主意。一般来说,您可能需要考虑使用状态ttl实现gdpr遵从性(例如),并使用计时器实现业务逻辑。

相关问题