scala 如何在spark dataframes/spark sql中用schema读取json?

r7xajy2e  于 2023-03-12  发布在  Scala
关注(0)|答案(2)|浏览(144)

sql/dataframes,请帮助我或提供一些好的建议,就如何阅读这个json

{
    "billdate":"2016-08-08",
    "accountid":"xxx"
    "accountdetails":{
        "total":"1.1"
        "category":[
        {
            "desc":"one",
            "currentinfo":{
            "value":"10"
        },
            "subcategory":[
            {
                "categoryDesc":"sub",
                "value":"10",
                "currentinfo":{
                    "value":"10"
                }
            }]
        }]
    }
}

谢谢你,

bjp0bcyl

bjp0bcyl1#

您可以尝试以下代码来读取Spark 2.2中基于Schema的JSON文件

import org.apache.spark.sql.types.{DataType, StructType}

//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json

//add the schema 
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]

//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")
2w3rbyxf

2w3rbyxf2#

似乎您的json无效。请检查http://www.jsoneditoronline.org/
请参见an-introduction-to-json-support-in-spark-sql.html
如果你想注册为表,你可以像下面这样注册并打印模式。

DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
    df.registerTempTable("df");
    df.printSchema();

下面是示例代码片段

DataFrame app = df.select("toplevel");
        app.registerTempTable("toplevel");
        app.printSchema();
        app.show();
DataFrame appName = app.select("toplevel.sublevel");
        appName.registerTempTable("sublevel");
        appName.printSchema();
        appName.show();

scala示例:

{"name":"Michael", "cities":["palo alto", "menlo park"], "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}
{"name":"Andy", "cities":["santa cruz"], "schools":[{"sname":"ucsb", "year":2011}]}
{"name":"Justin", "cities":["portland"], "schools":[{"sname":"berkeley", "year":2014}]}

 val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

阅读顶级字段

val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

 names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)

使用select()方法指定顶级字段,使用collect()将其收集到Array[Row]中,使用getString()方法访问每个Row中的列。

拼合并读取JSON数组

每个Person都有一个“cities”数组。2让我们把这些数组展开,读出它们的所有元素。

val flattened = people.explode("cities", "city"){c: List[String] => c}
flattened: org.apache.spark.sql.DataFrame

val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]

 allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)

explode()方法将cities数组展开或展平为一个名为“city”的新列,然后使用select()选择新列,collect()将其收集到Array[Row]中,并使用getString()访问每行中的数据。

读取嵌套JSON对象数组,未展开

读取“schools”数据,这是一个嵌套的JSON对象数组,数组中的每个元素都包含学校名称和年份:

val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]

val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]

 schoolsArr.foreach(schools => {
    schools.map(row => print(row.getString(0), row.getLong(1)))
    print("\n")
 })
(stanford,2010)(berkeley,2012) 
(ucsb,2011) 
(berkeley,2014)

使用select()collect()选择“schools”数组,并将其收集到Array[Row]中。现在,每个“schools”数组的类型都是List[Row],因此我们使用getSeq[Row]()方法读取它。最后,我们可以读取每个学校的信息,方法是调用getString()获取学校名称,调用getLong()获取学年。

相关问题