我有这个dataframe schema:
root
|-- data: array (nullable = false)
| |-- element: string (containsNull = true)
|-- schema: string (nullable = false)
dataframe看起来像这样:
| 资料|图式|
| --------------|--------------|
| [{“APN”:“Test1”,“DeviceIPAddress”:“Test2”},{“APN”:“Test3”,“DeviceIPAddress”:“Test4”}]|STRUCT〈APN:字符串,设备IP地址:字符串|
基本上,这个dataframe是由一列json字符串数组和另一列对应的schema组成的。
我想分解列表中具有相同模式的每个元素的行。我想得到以下结果:
| APN|设备IP地址|
| --------------|--------------|
| 测试1|测试2|
| 测试3|测试4|
如果我修复了模式并将其作为字符串传递给from_json函数,它可以工作,但我希望它是动态的。
这是我使用的相对错误代码:
df.withColumn("jsonData", explode("data"))\
.withColumn("jsonData", from_json(col("jsonData"), col("schema")))\
.withColumn("jsonData", explode(array("jsonData")))\
.select("jsonData.*")
这是我得到的错误:
pyspark.sql.utils.AnalysisException:Schema应以DDL格式指定为字符串文字或schema_of_json/schema_of_csv函数的输出,而不是“schema”
基本上,当我将col(“schema”)传递给from_json函数时,错误就会发生。如果我将其创建为静态,并将其作为变量传递,它就会工作。有没有一种方法可以将列中的schema传递给from_json函数的schema?
1条答案
按热度按时间2exbekwf1#
尝试使用
expr()
,并通过在此处传递column names
来使用from_json()
函数Example: