我的要求是,我有一个多hdfs的位置,从Kafka摄取文件每小时。因此,对于每个目录,如何将特定时间戳的所有文件合并到当前时间戳,作为单个Parquet文件,下次仅将上次合并的时间戳的文件合并到当前时间戳,并在将来重复相同的操作。这就是我在sparkscala工作中所要做的,所以不能使用普通的shell脚本。如有任何建议,我们将不胜感激。
k5hmc34c1#
下面是一段代码片段,可以帮助您完成这项工作。第一步是以Map的形式获取每个日期的文件列表。 (Map[String, List[String]]) 其中key是date,value是具有相同日期的文件列表。日期取自hdfs文件的修改时间戳。注意:使用本地路径测试代码,根据需要提供正确的hdfs路径/url。在编写输出时,没有直接选项来指定目标文件名,但是可以指定特定于每个日期的目标目录。代码使我们可以使用文件系统api将文件重命名为所需的名称,并删除每个日期创建的临时输出文件夹。
(Map[String, List[String]])
import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.SparkContext import org.joda.time.format.DateTimeFormat object MergeFiles { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Merging files day wise in a directory") .master("local[2]") .getOrCreate() val inputDir = "/Users/sujesh/test_data" val outputDir = "/Users/sujesh/output_data" val hadoopConf = spark.sparkContext.hadoopConfiguration val fs = FileSystem.get(hadoopConf) val filesPerDate = getFiles(inputDir, fs) filesPerDate .foreach { m => spark .read .format("csv") .option("inferSchema", false) .option("header", false) .load(m._2:_*) .repartition(1) .write .format("csv") .save(s"$outputDir/${m._1}") val file = fs.globStatus(new Path(s"$outputDir/${m._1}/part*.csv"))(0).getPath.getName fs.rename(new Path(s"$outputDir/${m._1}/$file"), new Path(s"$outputDir/${m._1}.csv")) fs.delete(new Path(s"$outputDir/${m._1}"), true) } } /* Get the list of files group by date date is taken from file's modification timestamp */ def getFiles(dir: String, fs: FileSystem) = { fs .globStatus(new Path(s"$dir/*.csv")) .map { f: FileStatus => (DateTimeFormat.forPattern("yyyyMMdd").print(f.getModificationTime), f.getPath.toUri.getRawPath) }.groupBy(_._1) .map { case (k,v) => (k -> v.map(_._2).toSeq) } } }
您可以在测试之后进一步优化代码,如果需要重用,可以将文件重命名代码转换为util。把所有的选项 inferSchema 或者 header 错误的。你需要什么就用什么。这种方法也适用于其他格式的文件。注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行的,那么您也需要显式地更新文件的修改时间戳,或者忽略具有文件名模式的文件,例如 yyyyMMdd.csv
inferSchema
header
yyyyMMdd.csv
1条答案
按热度按时间k5hmc34c1#
下面是一段代码片段,可以帮助您完成这项工作。
第一步是以Map的形式获取每个日期的文件列表。
(Map[String, List[String]])
其中key是date,value是具有相同日期的文件列表。日期取自hdfs文件的修改时间戳。注意:使用本地路径测试代码,根据需要提供正确的hdfs路径/url。
在编写输出时,没有直接选项来指定目标文件名,但是可以指定特定于每个日期的目标目录。代码使我们可以使用文件系统api将文件重命名为所需的名称,并删除每个日期创建的临时输出文件夹。
您可以在测试之后进一步优化代码,如果需要重用,可以将文件重命名代码转换为util。把所有的选项
inferSchema
或者header
错误的。你需要什么就用什么。这种方法也适用于其他格式的文件。注意:如果您在同一目录中重复执行此过程,则需要进一步调整,因为新创建的文件将具有最新的时间戳。因此,如果这不是每天运行的,那么您也需要显式地更新文件的修改时间戳,或者忽略具有文件名模式的文件,例如
yyyyMMdd.csv