我开始学习PySpark,但现在我坚持从一个嵌套框架中转换JSON文档,与我的示例数据相比,初始嵌套框架有超过2行。
我的初始框架在这里:
df = spark.createDataFrame(["A", "B"], StringType()).toDF("id")
display(df)
字符串
我在嵌套框架中调用的函数看起来像这样:
def getObjectInformation(id):
normalized_data = dict()
theJSONresponse = requests.get("https://someone.somewhere/" + id).json()['value']
theJSONresponse_dumps = json.dumps(theJSONresponse)
normalized_data["_data"] = theJSONresponse_dumps
return normalized_data["_data"]
udf_getObjectInformation = udf(lambda x: getObjectInformation(x))
型
我从框架内部调用函数:
df_oid = df.select('id').withColumn('oid', udf_getObjectInformation(df.id))
型
这些是id=A和id=B的JSON文档
#normalized_data["_data"] for id = A
[{"oid": "1", "id": "A", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
{"oid": "2", "id": "A", "type": "this", "oDetails": "{\"c\":\"red\",\"p\":\"book\"}"},
{"oid": "3", "id": "A", "type": "that", "oDetails": "{\"c\":\"green\",\"p\":\"book\"}"}]
#normalized_data["_data"] for id=B
[{"oid": "57", "id": "B", "type": "this", "oDetails": "{\"c\":\"blue\",\"p\":\"fruit\"}"},
{"oid": "59", "id": "B", "type": "that", "oDetails": "{\"c\":\"blue\",\"p\":\"shirt\"}"}]
型
现在,我的斗争开始了...
我希望我的最后一个框架是这样的:
data = [
("A","1","this","blue","fruit"),
("A","2","this","red","book"),
("A","3","this","green" ,"book"),
("B","57","this","blue","fruit"),
("B","59","something","blue", "shirt")
]
schema = StructType([ \
StructField("id",StringType(),True), \
StructField("oid",StringType(),True), \
StructField("type",StringType(),True), \
StructField("c", StringType(), True), \
StructField("p", StringType(), True) \
])
df_final = spark.createDataFrame(data=data,schema=schema)
型
任何提示,指导,解决方案都非常感谢。
1条答案
按热度按时间kr98yfug1#
您的输入是一个包含另一个JSON的JSON字符串。您可以解析JSON并应用必要的转换,例如
pyspark>=3.4
:字符串