我是flink(1.3.2)的新手,我有一个问题,想看看这里是否有人能帮忙。
所以我们有一个s3路径,flink正在监视该路径以查看可用的新文件。
val avroInputStream_activity = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)
我正在做内部和外部检查点,让我们假设有一个坏的文件来的路径和Flink会做几次重试。我想把那些坏文件带到一些错误文件夹,让这个过程继续。但是,由于文件路径保留在检查点中,当我尝试从外部检查点恢复时(我删除了坏文件),它在找不到文件时抛出以下错误。
java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No such file or directory: s3a://myfile
我有两个问题:
人们如何处理诸如坏文件或记录之类的异常。
有没有办法跳过这个坏文件并从检查点继续?
提前谢谢。
1条答案
按热度按时间qzwqbdag1#
最佳实践是通过捕获任何异常(例如由错误输入数据引起的异常)来保持作业的运行。然后可以使用side输出创建只包含坏记录的输出流。例如,您可以将它们发送到bucketing文件接收器进行进一步分析。