我是Pyspark的新手,我正在尝试扁平化嵌套的JSON文件,但我不想重复这些值。这是我的JSON文件:
{
"events": [
{
"event_name": "start",
"event_properties": ["property1", "property2", "property3"],
"entities": ["entityI", "entityII", "entityIII"],
"event_timestamp": "2022-05-01 00:00:00"
},
{
"event_name": "stop",
"event_properties": ["propertyA", "propertyB", "propertyC", "propertyD"],
"entities": ["entityW", "entityX", "entityY", "entityZ"],
"event_timestamp": "2022-05-01 01:00:00"
}
]
}
我想要的输出:- event_name|事件属性|实体|事件时间戳
开始|property1|实体I| 2022-05-01 00:00:00
开始|property2|实体II| 2022-05-01 00:00:00
开始|属性3|实体III| 2022-05-01 00:00:00
停止|属性A|实体W| 2022-05-01 01:00:00
停止|属性B|实体X| 2022-05-01 01:00:00
停止|属性C|实体Y| 2022-05-01 01:00:00
停止|属性D|实体Z| 2022-05-01 01:00:00
我试过的代码:
# Importing package
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
# Implementing JSON File in PySpark
spark = SparkSession.builder \
.master("local[1]") \
.appName("PySpark Read JSON") \
.getOrCreate()
df = spark.read.option("multiline","true").json(r"C:\Users\Lajo\Downloads\spark_ex1_input.json")
from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col
def flatten(df):
# compute Complex Fields (Lists and Structs) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
while len(complex_fields)!=0:
col_name=list(complex_fields.keys())[0]
print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in complex_fields[col_name]]]
df=df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df=df.withColumn(col_name,explode_outer(col_name))
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType or type(field.dataType) == StructType])
return df
df_flatten = flatten(df)
df_flatten.show()
1条答案
按热度按时间t40tm48m1#
你可以这么做。代码是自我解释的。您可以查看所使用函数的文档。
这是输出。