如何在scala spark中从多个列表创建多个Dataframe

xmjla07d  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(433)

我试图从下面的两个列表中创建多个Dataframe,

val paths = ListBuffer("s3://abc_xyz_tableA.json",
                       "s3://def_xyz_tableA.json",
                       "s3://abc_xyz_tableB.json",
                       "s3://def_xyz_tableB.json",
                       "s3://abc_xyz_tableC.json",....)

val tableNames = ListBuffer("tableA","tableB","tableC","tableD",....)

我想使用表名创建不同的dataframe,方法是将所有公共表名结束的s3路径放在一起,因为它们具有唯一的模式。

so for example if the tables and paths related to it are brought together then -

 "tableADF" will have all the data from these paths "s3://abc_xyz_tableA.json", "s3://def_xyz_tableA.json" as they have "tableA" in the path

 "tableBDF" will have all the data from these paths "s3://abc_xyz_tableB.json", "s3://def_xyz_tableB.json" as they have "tableB" in the path

and so on there can be many tableNames and Paths

我正在尝试不同的方法,但还没有成功。任何实现所需解决方案的线索都将大有裨益。谢谢!

tf7tbtn2

tf7tbtn21#

使用 input_file_name() udf,您可以根据文件名进行过滤,以获得每个文件/文件模式的Dataframe

import org.apache.spark.sql.functions._
import spark.implicits._
var df = spark.read.format("json").load("s3://data/*.json")
df = df.withColumn(
  "input_file", input_file_name()
)

val tableADF= df.filter($"input_file".endsWith("tableA.json"))
val tableBDF= df.filter($"input_file".endsWith("tableB.json"))
zhte4eai

zhte4eai2#

检查以下代码&最终结果类型为 scala.collection.immutable.Map[String,org.apache.spark.sql.DataFrame] = Map(tableBDF -> [...], tableADF -> [...], tableCDF -> [...]) 哪里 ... 是您的列列表。

paths
.map(path => (s"${path.split("_").last.split("\\.json").head}DF",path)) // parsing file names and extracting table name and path into tuple
.groupBy(_._1) // grouping paths based same table name
.map(p => (p._1 -> p._2.map(_._2))).par // combining paths for same table into list and also .par function to execute subsequent steps  in Parallel
.map(mp => { 
      (
         mp._1, // table name
         mp._2.par // For same DF multiple Files load parallel.
                   .map(spark.read.json(_)) // loading files s3
                   .reduce(_ union _) // union if same table has multiple files.
      )
   }
)
9bfwbjaz

9bfwbjaz3#

如果文件post-fix名称列表很长,那么您可以使用下面的内容,也可以在内联中找到代码解释

import org.apache.spark.sql.functions._

object DFByFileName {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your JSON data
    var df = spark.read.format("json").load("s3://data/*.json")

    //Add a column with file name
    df = df.withColumn(
      "input_file", (input_file_name())
    )

    //Extract unique file postfix from the file names in a List
    val fileGroupList = df.select("input_file").map(row => {
      val fileName = row.getString(0)
      val index1 = fileName.lastIndexOf("_")
      val index2 = fileName.lastIndexOf(".")
      fileName.substring(index1 + 1, index2)
    }).collect()

    //Iterate file group name to map of (fileGroup -> Dataframe of file group) 
    fileGroupList.map(fileGroupName => {
      df.filter($"input_file".endsWith(s"${fileGroupName}.json"))
      //perform dataframe operations
    })
  }

}

相关问题