我试着在flink任务完成后做一个动作(在db中做一些更改)。我想在同一个flink应用程序中完成它,但没有运气。我发现有一个jobstatuslistener在executiongraph中被通知更改了状态,但是我找不到如何让这个executiongraph注册我的侦听器。我已经尝试在我的项目中完全替换executiongraph(是的,方法不好,但是…),但是只要它是运行时库,就不会在分布式模式下调用它,只在本地运行时调用它。
我有下一份flink申请简而言之:
DataSource.output(RichOutputFormat.class)
ExecutionEnvironment.getExecutionEnvironment().execute()
有人能帮忙吗?
暂无答案!
目前还没有任何答案,快来回答吧!