spark dataframes:读取具有重复列名但不同数据类型的json

eanckbw9  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(622)

我有如下json数据,其中version字段是区别点-
file_1 = {"version": 1, "stats": {"hits":20}} file_2 = {"version": 2, "stats": [{"hour":1,"hits":10},{"hour":2,"hits":12}]} 在新的格式中,stats列现在是 Arraytype(StructType) .
以前只需要文件1,所以我使用

spark.read.schema(schema_def_v1).json(path)

现在我需要同时读取这两种类型的多个json文件。我不能在schema_def中将stats定义为字符串,因为这会影响 corruptrecord 特性(用于stats列),用于检查所有字段的格式错误的json和模式符合性。
示例1只读中需要df输出-

version | hour | hits
1       | null | 20
2       | 1    | 10
2       | 2    | 12

我试着和你一起读书 mergeSchema 选项,但这使stats字段成为字符串类型。
此外,我还尝试通过在version字段上过滤并应用 spark.read.schema(schema_def_v1).json(df_v1.toJSON) . 这里stats列也变成 String 类型。
我在想,如果在阅读时,我可以将df列标题解析为 stats_v1 以及 stats_v2 基于数据类型可以解决这个问题。请帮助解决任何可能的问题。

holgip5t

holgip5t1#

iiuc,您可以使用spark.read.text读取json文件,然后解析 value 使用json\u tuple,来自\u json。通知 stats 我们使用的字段 coalesce 基于两个或多个模式分析字段(添加 wholetext=True 作为spark.read.text的参数(如果每个文件包含一个跨多行的json文档)

from pyspark.sql.functions import json_tuple, coalesce, from_json, array

df = spark.read.text("/path/to/all/jsons/")

schema_1 = "array<struct<hour:int,hits:int>>"
schema_2 = "struct<hour:int,hits:int>"

df.select(json_tuple('value', 'version', 'stats').alias('version', 'stats')) \
    .withColumn('status', coalesce(from_json('stats', schema_1), array(from_json('stats', schema_2)))) \
    .selectExpr('version', 'inline_outer(status)') \
    .show()
+-------+----+----+
|version|hour|hits|
+-------+----+----+
|      2|   1|  10|
|      2|   2|  12|
|      1|null|  20|
+-------+----+----+
h9vpoimq

h9vpoimq2#

UDF 要检查字符串或数组,如果是字符串,它会将字符串转换为数组。

import org.apache.spark.sql.functions.udf
import org.json4s.{DefaultFormats, JObject}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.Serialization.write
import scala.util.{Failure, Success, Try}

object Parse {
    implicit val formats = DefaultFormats
    def toArray(data:String) = {
      val json_data = (parse(data))
      if(json_data.isInstanceOf[JObject]) write(List(json_data)) else data
    }
}

val toJsonArray = udf(Parse.toArray _)
scala> "ls -ltr /tmp/data".!
total 16
-rw-r--r--  1 srinivas  root  37 Jun 26 17:49 file_1.json
-rw-r--r--  1 srinivas  root  69 Jun 26 17:49 file_2.json
res4: Int = 0

scala> val df = spark.read.json("/tmp/data").select("stats","version")
df: org.apache.spark.sql.DataFrame = [stats: string, version: bigint]

scala> df.printSchema
root
 |-- stats: string (nullable = true)
 |-- version: long (nullable = true)
scala> df.show(false)
+-------+-------------------------------------------+
|version|stats                                      |
+-------+-------------------------------------------+
|1      |{"hits":20}                                |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|
+-------+-------------------------------------------+

输出

scala> 

import org.apache.spark.sql.types._
val schema = ArrayType(MapType(StringType,IntegerType))

df
.withColumn("json_stats",explode(from_json(toJsonArray($"stats"),schema)))
.select(
    $"version",
    $"stats",
    $"json_stats".getItem("hour").as("hour"),
    $"json_stats".getItem("hits").as("hits")
).show(false)

+-------+-------------------------------------------+----+----+
|version|stats                                      |hour|hits|
+-------+-------------------------------------------+----+----+
|1      |{"hits":20}                                |null|20  |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|1   |10  |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|2   |12  |
+-------+-------------------------------------------+----+----+

无自定义项

scala> val schema = ArrayType(MapType(StringType,IntegerType))

scala> val expr = when(!$"stats".contains("[{"),concat(lit("["),$"stats",lit("]"))).otherwise($"stats")

df
.withColumn("stats",expr)
.withColumn("stats",explode(from_json($"stats",schema)))
.select(
    $"version",
    $"stats",
    $"stats".getItem("hour").as("hour"),
    $"stats".getItem("hits").as("hits")
)
.show(false)

+-------+-----------------------+----+----+
|version|stats                  |hour|hits|
+-------+-----------------------+----+----+
|1      |[hits -> 20]           |null|20  |
|2      |[hour -> 1, hits -> 10]|1   |10  |
|2      |[hour -> 2, hits -> 12]|2   |12  |
+-------+-----------------------+----+----+
jtw3ybtb

jtw3ybtb3#

先读取第二个文件,分解 stats ,使用架构读取第一个文件。

from pyspark.sql import SparkSession
from pyspark.sql.functions import  explode

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

file_1 = {"version": 1, "stats": {"hits": 20}}

file_2 = {"version": 2, "stats": [{"hour": 1, "hits": 10}, {"hour": 2, "hits": 12}]}

df1 = spark.read.json(sc.parallelize([file_2])).withColumn('stats', explode('stats'))
schema = df1.schema

spark.read.schema(schema).json(sc.parallelize([file_1])).printSchema()

output >> root
 |-- stats: struct (nullable = true)
 |    |-- hits: long (nullable = true)
 |    |-- hour: long (nullable = true)
 |-- version: long (nullable = true)

相关问题