如何在Spark SQL中推断序列化JSON列的模式?

tez616oj  于 2023-10-21  发布在  Spark
关注(0)|答案(4)|浏览(141)

我有一个表,其中有1列是序列化的JSON。我想在这个JSON列上应用模式推断。我不知道要作为JSON提取的输入传递的模式(例如:from_json函数)。
我可以在Scala中这样做,

val contextSchema = spark.read.json(data.select("context").as[String]).schema
val updatedData = data.withColumn("context", from_json(col("context"), contextSchema))

如何将此解决方案转换为纯Spark-SQL?

p1tboqfb

p1tboqfb1#

对于spark-sql,使用**toDDL生成模式,然后使用from_json**中的模式。

Example:

df.show(10,false)
//+---+-------------------+
//|seq|json               |
//+---+-------------------+
//|1  |{"id":1,"name":"a"}|
//+---+-------------------+

val sch=spark.read.json(df.select("json").as[String]).schema.toDDL
//sch: String = `id` BIGINT,`name` STRING

df.createOrReplaceTempView("tmp")

spark.sql(s"""select seq,jsn.* from (select *,from_json(json,"$sch") as jsn  from tmp)""").
show(10,false)
//+---+---+----+
//|seq|id |name|
//+---+---+----+
//|1  |1  |a   |
//+---+---+----+
vbopmzt1

vbopmzt12#

可以使用schema_of_json()函数来推断JSON模式。

select from_json(<column_name>, schema_of_json(<sample_JSON>)) from <table>
2q5ifsrm

2q5ifsrm3#

我找到了一个变通办法,
1.将其转换为RDD并使用spark框架读取

spark
  .read
  .option("inferSchema", True)
  .json(
      df.rdd.map(
          lambda rec: rec.context
      )
  )

1.如果字段/路径事先已知,我们可以使用

df.select(json_tuple(col("context"),"<json path or attribute>").alias("field_name")).show()
sd2nnvve

sd2nnvve4#

你可以试试这个:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JsonInferSchema").getOrCreate()

data = spark.createDataFrame([
    (1, '{"name":"John", "age":30}'),
    (2, '{"name":"Doe", "age":25}')
], ["id", "context"])

data.createOrReplaceTempView("tempTable")

query = """
SELECT 
    id, 
    context,
    get_json_object(context, '$.name') as name,
    get_json_object(context, '$.age') as age
FROM tempTable
"""

updatedData = spark.sql(query)

updatedData.show()

相关问题