假设我有一个flink作业正在处理1、2、control_flag、3...等数据流。当满足control_flag时,作业应停止并保存到保存点,并且不应处理或删除以下消息3...。当在flink之外执行中心操作并从保存点重新启动作业时,作业应继续处理以下消息。但是,如果作业在进程操作器中挂起,并存在一个休眠循环,以阻止处理以下消息,则无法使用flink API通过savepoint停止作业。那么如何在control_flag位置停止作业,并让作业在其旁边的位置重新启动?
假设我有一个flink作业正在处理1、2、control_flag、3...等数据流。当满足control_flag时,作业应停止并保存到保存点,并且不应处理或删除以下消息3...。当在flink之外执行中心操作并从保存点重新启动作业时,作业应继续处理以下消息。但是,如果作业在进程操作器中挂起,并存在一个休眠循环,以阻止处理以下消息,则无法使用flink API通过savepoint停止作业。那么如何在control_flag位置停止作业,并让作业在其旁边的位置重新启动?
1条答案
按热度按时间lrl1mhuk1#
一些建议可以在here中找到。
有几种可能的方法可以做到这一点,但我认为,既然您希望在运行之间保持状态,最好的想法是使用一个操作符:
stop_execution
为假,则处理数据并将其输出给下游操作员。stop_execution
为真,则将接收到的数据添加到列表状态。control_flag
,则会发出侧输出,表示应停止作业。现在由您来监听端输出,它可以是从Kafka读取数据并执行正确的REST调用以停止给定作业的外部服务,也可以是您想要的任何其他服务。