我有一个flink批处理作业,它从s3读取一个非常大的Parquet文件,然后将json放入kafka主题。
问题是如何使文件读取过程有状态?我的意思是,每当作业中断或崩溃时,作业应该从以前的阅读状态开始?我不想发送重复的项目Kafka时,作业重新启动。
下面是我的示例代码
val env = ExecutionEnvironment.getExecutionEnvironment
val input = Parquet.input[User](new Path(s"s3a://path"))
env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)
暂无答案!
目前还没有任何答案,快来回答吧!