python 如何将存储在列中的动态JSON模式传递给from_json

fhity93d  于 2023-04-19  发布在  Python
关注(0)|答案(1)|浏览(115)

我有这个dataframe schema:

root
 |-- data: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- schema: string (nullable = false)

dataframe看起来像这样:
| 资料|图式|
| --------------|--------------|
| [{“APN”:“Test1”,“DeviceIPAddress”:“Test2”},{“APN”:“Test3”,“DeviceIPAddress”:“Test4”}]|STRUCT〈APN:字符串,设备IP地址:字符串|
基本上,这个dataframe是由一列json字符串数组和另一列对应的schema组成的。
我想分解列表中具有相同模式的每个元素的行。我想得到以下结果:
| APN|设备IP地址|
| --------------|--------------|
| 测试1|测试2|
| 测试3|测试4|
如果我修复了模式并将其作为字符串传递给from_json函数,它可以工作,但我希望它是动态的。
这是我使用的相对错误代码:

df.withColumn("jsonData", explode("data"))\
        .withColumn("jsonData", from_json(col("jsonData"), col("schema")))\
        .withColumn("jsonData", explode(array("jsonData")))\
        .select("jsonData.*")

这是我得到的错误:
pyspark.sql.utils.AnalysisException:Schema应以DDL格式指定为字符串文字或schema_of_json/schema_of_csv函数的输出,而不是“schema”
基本上,当我将col(“schema”)传递给from_json函数时,错误就会发生。如果我将其创建为静态,并将其作为变量传递,它就会工作。有没有一种方法可以将列中的schema传递给from_json函数的schema?

2exbekwf

2exbekwf1#

尝试使用expr(),并通过在此处传递column names来使用from_json()函数

Example:

df.withColumn("jsonData", explode("data"))\
        .withColumn("jsonData", expr('from_json(jsonData,schema)'))\
        .withColumn("jsonData", explode(array("jsonData")))\
        .select("jsonData.*")

相关问题