如何在scala spark中将两个json结果合并成一个Dataframe

vvppvyoh  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(674)

我有一个代码,可以读取json文件位置的数组,并使用spark read json函数读取它。但我得到的输出低于1
以下代码的输出:

+---+------+
|name  |age|
+---+------+
|John  |24 |
|Cammy |20 |
|Britto|30 |
+---+------+

+---+------+
|name  |age|
+---+------+
|George|23 |
|Mikle |15 |
+---+------+

代码:

val dataPath = dataArr(counter)("data").arr
if (dataArr(counter)("type").value.toString() == "json") {
  dataLocation.foreach(i => {
    val rdd = spark.sparkContext.wholeTextFiles(i.str)
    val JsonFormat = rdd.map(i => "[" + i._2.replaceAll("\\}.*\n{0,}.*\\{", "},{") + "]")    
    val jsonOutput = spark.read.schema(Schema.getSchema(name)).option("multiLine", true).json(JsonFormat).show()
})

但是如何更改上面的代码以获得下面的预期输出,即我希望上面的输出在单个表/Dataframe中

Expected output
    +---+------+
    |name  |age|
    +---+------+
    |John  |24 |
    |Cammy |20 |
    |Britto|30 |
    |George|23 |
    |Mikle |15 |
    +---+------+

我的datalocation数组是这样的:

{
        "source": [
            {
                "name": "testPersons",
                "data": [
                "E:\\dataset\\2020-05-01\\",
                "E:\\dataset\\2020-05-02\\"
                ],
                "type": "json"
            },
{
                "name": "testPets",
                "data": [
                "E:\\dataset\\2020-05-01\\078\\",
                "E:\\dataset\\2020-05-02\\078\\"
                ],
                "type": "json"
            }
        ]
    }
zkure5ic

zkure5ic1#

而不是foreach。。做一个Map操作..这样就可以得到一个Dataframe列表(即每个json文件有一个df)。
然后简单地把它们结合起来-

// After map rather than foreach
val dflist : List[DataFrame] = ???

在您的例子中,它可以是-(我使用var,因为我不想重新编写完整的代码或偏离您已经编写的内容。。。scala没有全局变量语法)

var dflist: List[DataFrame] = List[DataFrame]()

val dataPath = dataArr(counter)("data").arr
if (dataArr(counter)("type").value.toString() == "json") {
    dflist = dataLocation.map(i => {
    val rdd = spark.sparkContext.wholeTextFiles(i.str)
    val JsonFormat = rdd.map(i => "[" + i._2.replaceAll("\\}.*\n{0,}.*\\{", "},{") + "]")    
    spark.read.schema(Schema.getSchema(name)).option("multiLine", true).json(JsonFormat)
})
// createEmpty dataframe with your schema for zero value to FoldLeft
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema) 

val finalDF = dflist.foldLeft(emptyDF )((x,y) => x.union(y))

相关问题