在Spark Streaming中如何处理旧数据和删除已处理的数据

50pmv0ei  于 2023-11-21  发布在  Apache
关注(0)|答案(3)|浏览(185)

我们正在运行一个Spark流作业,它从一个目录中检索文件(使用textFileStream)。我们担心的一个问题是,作业已经关闭,但文件仍然被添加到目录中。一旦作业再次启动,这些文件不会被拾取(因为它们不是新的或在作业运行时更改的),但我们希望它们被处理。
1)有没有解决方案?有没有办法跟踪哪些文件已经处理,我们可以“强制”旧文件被拾起?
2)有没有办法删除已处理的文件?

fgw7neuy

fgw7neuy1#

下面的文章几乎涵盖了你所有的问题。
https://blog.yanchen.ca/2016/06/28/fileinputdstream-in-spark-streaming/

  • 1)是否有解决方案?是否有办法跟踪已处理的文件,我们可以“强制”拾取旧文件?*

流读取器在启动作业/应用程序时使用系统时钟启动批处理窗口。显然,之前创建的所有文件都将被忽略。请尝试启用检查点设置

  • 2)是否有方法删除已处理的文件?*

删除文件可能是不必要的。如果检查点工作,Spark会识别未处理的文件。如果由于某种原因要删除文件,请实现自定义输入格式和读取器(请参考文章)来捕获文件名并适当使用此信息。但我不推荐这种方法。

v6ylcynt

v6ylcynt2#

你第二个问题的答案,
现在在Spark 3中已经可以了。你可以使用“cleanSource”选项来读取流。
感谢文档https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html和此视频https://www.youtube.com/watch?v=EM7T34Uu2Gg
经过几个小时的寻找,终于找到了解决办法

gmxoilav

gmxoilav3#

  • 是否有方法删除已处理的文件?*

根据我的经验,我不能使用检查点功能,所以我不得不删除/移动已经进入每个批次的处理过的文件。
获取这些文件的方法有点棘手,但基本上我们可以说它们是当前RDD的祖先(依赖项)。然后我使用的是一种递归方法,该方法爬取该结构并恢复开始以hdfs开头的RDD的名称。

/**
    * Recursive method to extract original metadata files involved in this batch.
    * @param rdd Each RDD created for each batch.
    * @return All HDFS files originally read.
    */
   def extractSourceHDFSFiles(rdd: RDD[_]): Set[String] = {

     def extractSourceHDFSFilesWithAcc(rdd: List[RDD[_]]) : Set[String] = {

      rdd match {
        case Nil => Set()
        case head :: tail => {
          val name = head.toString()
          if (name.startsWith("hdfs")){
            Set(name.split(" ")(0)) ++ extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
          }
          else {
            extractSourceHDFSFilesWithAcc(head.dependencies.map(_.rdd).toList) ++ extractSourceHDFSFilesWithAcc(tail)
          }
        }
      }
    }

    extractSourceHDFSFilesWithAcc(rdd.dependencies.map(_.rdd).toList)
  }

字符串
因此,在forEachRDD方法中,您可以轻松地调用它:

stream.forEachRDD(rdd -> {

     val filesInBatch = extractSourceHDFSFiles(rdd)
    logger.info("Files to be processed:")

    // Process them

    // Delete them when you are done
})

相关问题