apacheflink从s3有状态读取文件

9avjhtql  于 2021-06-21  发布在  Flink
关注(0)|答案(0)|浏览(252)

我有一个flink批处理作业,它从s3读取一个非常大的Parquet文件,然后将json放入kafka主题。
问题是如何使文件读取过程有状态?我的意思是,每当作业中断或崩溃时,作业应该从以前的阅读状态开始?我不想发送重复的项目Kafka时,作业重新启动。
下面是我的示例代码

val env = ExecutionEnvironment.getExecutionEnvironment
val input = Parquet.input[User](new Path(s"s3a://path"))
env.createInput(input)
  .filter(r => Option(r.token).getOrElse("").nonEmpty)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题