scala-如何合并hdfs位置的增量文件

apeeds0o  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(1015)

我的要求是,我有一个多hdfs的位置,从Kafka摄取文件每小时。因此,对于每个目录,如何将特定时间戳的所有文件合并到当前时间戳,作为单个Parquet文件,下次仅将上次合并的时间戳的文件合并到当前时间戳,并在将来重复相同的操作。这就是我在sparkscala工作中所要做的,所以不能使用普通的shell脚本。如有任何建议,我们将不胜感激。

k5hmc34c

k5hmc34c1#

下面是一段代码片段,可以帮助您完成这项工作。
第一步是以Map的形式获取每个日期的文件列表。 (Map[String, List[String]]) 其中key是date,value是具有相同日期的文件列表。日期取自hdfs文件的修改时间戳。
注意:使用本地路径测试代码,根据需要提供正确的hdfs路径/url。
在编写输出时,没有直接选项来指定目标文件名,但是可以指定特定于每个日期的目标目录。代码使我们可以使用文件系统api将文件重命名为所需的名称,并删除每个日期创建的临时输出文件夹。

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.spark.SparkContext
import org.joda.time.format.DateTimeFormat

object MergeFiles {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Merging files day wise in a directory")
      .master("local[2]")
      .getOrCreate()

    val inputDir = "/Users/sujesh/test_data"
    val outputDir = "/Users/sujesh/output_data"

    val hadoopConf = spark.sparkContext.hadoopConfiguration
    val fs = FileSystem.get(hadoopConf)

    val filesPerDate = getFiles(inputDir, fs)

    filesPerDate
      .foreach { m =>
        spark
          .read
          .format("csv")
          .option("inferSchema", false)
          .option("header", false)
          .load(m._2:_*)
          .repartition(1)
          .write
          .format("csv")
          .save(s"$outputDir/${m._1}")

        val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName
        fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv"))
        fs.delete(new Path(s"$outputDir/${m._1}"), true)
      }
  }

  /*
    Get the list of files group by date
    date is taken from file's modification timestamp
   */
  def getFiles(dir: String, fs: FileSystem) = {
    fs
      .globStatus(new Path(s"$dir/*.csv"))
      .map { f: FileStatus =>
        (DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath)
       }.groupBy(_._1)
       .map { case (k,v) => (k -> v.map(_._2).toSeq) }
  }
}

您可以在测试之后进一步优化代码,如果需要重用,可以将文件重命名代码转换为util。把所有的选项 inferSchema 或者 header 错误的。你需要什么就用什么。这种方法也适用于其他格式的文件。
注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行的,那么您也需要显式地更新文件的修改时间戳,或者忽略具有文件名模式的文件,例如 yyyyMMdd.csv

相关问题