在循环中使用scala中的字符串列表中的自定义名称创建dataframe

z6psavjg  于 2021-07-14  发布在  Java
关注(0)|答案(2)|浏览(289)

嘿,我有一个典型的需求,我必须在一个循环中使用scala中的字符串列表中的自定义名称创建dataframes。
就像我有一个字符串列表,比如说(product,customer,order,…),这个列表可以有n个条目,其中n可以是任何数字,比如说30。列表中每个项目的列也在另一个文件中指定。
因此,对于列表中的每个项目,例如product,我必须创建dataframe name作为product,稍后我需要编写sparksql,将列表中的所有项目连接起来,如下所示。
从product join customer中选择product.name、customer.name、order.name。。。加入订单。。。
这个连接查询将根据列表中的项目数进行动态查询。我正在考虑从shell脚本创建.scala文件。让我知道你的建议。

niknxzdl

niknxzdl1#

动态sql也可以从scala对象创建。根据用户输入,首先创建一个rdd。然后根据您的需求创建一个预期对象的列表,并创建一个Dataframe和对象名的Map。然后使用循环生成sql字符串。

ghhaqwfi

ghhaqwfi2#

嘿,我通过创建一个名为generatedf的方法实现了这一点,如下所示。这是将文件列表作为字符串(“,”分隔)和模式定义(“,”)分隔的文件,最后是将包含数据的文件。

def generateDF(fName: String, schemaFile: String, dataFile: String): Unit = {
// Reading the prod files and creating DataFrame from user defined schema
val SchemaRDD = spark.sparkContext.textFile(schemaFile)
val SchemaString = SchemaRDD.map(_.toString).collect().mkString
val Schema = StructType(SchemaString.split(",").map(column => StructField(column.split(":")(0), inferType(column), true)))
val outDF = spark.read.format("csv")
  .option("delimiter", ",").option("quote", "")
  .option("header", "false")
  .schema(Schema)
  .load(dataFile)
outDF.createTempView(fName)

}
//为源文件中的每个表名调用过程

fileListRDD
      .flatMap(_.split(",")).collect.toList
      .map(file => generateDF(file.mkString.toString, (filePath + file.mkString + ".schema"), (filePath + file.mkString + ".csv")))

相关问题