pyspark Pyspak - flatten json文件

wko9yo5t  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(127)

我是Pyspark的新手,我正在尝试扁平化嵌套的JSON文件,但我不想重复这些值。这是我的JSON文件:

{
    "events": [
        {
            "event_name": "start",
            "event_properties": ["property1", "property2", "property3"],
            "entities": ["entityI", "entityII", "entityIII"],
            "event_timestamp": "2022-05-01 00:00:00"
        },
        {
            "event_name": "stop",
            "event_properties": ["propertyA", "propertyB", "propertyC", "propertyD"],
            "entities": ["entityW", "entityX", "entityY", "entityZ"],
            "event_timestamp": "2022-05-01 01:00:00"
        }
    ]
}

我想要的输出:- event_name|事件属性|实体|事件时间戳
开始|property1|实体I| 2022-05-01 00:00:00
开始|property2|实体II| 2022-05-01 00:00:00
开始|属性3|实体III| 2022-05-01 00:00:00
停止|属性A|实体W| 2022-05-01 01:00:00
停止|属性B|实体X| 2022-05-01 01:00:00
停止|属性C|实体Y| 2022-05-01 01:00:00
停止|属性D|实体Z| 2022-05-01 01:00:00
我试过的代码:

# Importing package
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType

# Implementing JSON File in PySpark

spark = SparkSession.builder \
    .master("local[1]") \
    .appName("PySpark Read JSON") \
    .getOrCreate()
    
df = spark.read.option("multiline","true").json(r"C:\Users\Lajo\Downloads\spark_ex1_input.json")

from pyspark.sql.types import *
from pyspark.sql.functions import explode_outer,col

def flatten(df):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df=df.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df=df.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df
   
   
 df_flatten = flatten(df)
 
 df_flatten.show()
t40tm48m

t40tm48m1#

你可以这么做。代码是自我解释的。您可以查看所使用函数的文档。

from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F

sc = SparkContext('local')
sqlContext = SQLContext(sc)

inputFile = "../data/jsonformatdata.json"

inferredScheme = StructType([
    StructField('events',
                ArrayType(
                    StructType([
                        StructField('entities', ArrayType(StringType(), True), True),
                        StructField('event_name', StringType(), True),
                        StructField('event_properties', ArrayType(StringType(), True), True),
                        StructField('event_timestamp', DateType(), True)]), True), True)])

initialDF = sqlContext.read.schema(inferredScheme).option("multiline", "true").json(inputFile)
initialDF.show(n=100, truncate=False)
initialDF = initialDF.select(F.explode(F.col("events")).alias("temp_alias"))
initialDF.show(n=100, truncate=False)

ans_df = initialDF.withColumn("event_name", F.col("temp_alias.event_name"))
ans_df = ans_df.withColumn("event_properties", F.col("temp_alias.event_properties"))
ans_df = ans_df.withColumn("entities", F.col("temp_alias.entities"))
ans_df = ans_df.withColumn("event_timestamp", F.to_timestamp(F.col("temp_alias.event_timestamp"), "yyyy-MM-dd HH:mm:ss"))

ans_df = ans_df.drop("temp_alias")
ans_df.show(n=100, truncate=False)

ans_df = ans_df.withColumn('mapped', F.map_from_arrays('event_properties', 'entities'))
ans_df.show(n=100, truncate=False)

ans_df = ans_df.select("event_name", F.explode('mapped').alias('event_properties', 'entities'), "event_timestamp")
ans_df.show(n=100, truncate=False)

这是输出。

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|events                                                                                                                                                                                          |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[{[entityI, entityII, entityIII], start, [property1, property2, property3], 2022-05-01}, {[entityW, entityX, entityY, entityZ], stop, [propertyA, propertyB, propertyC, propertyD], 2022-05-01}]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

+------------------------------------------------------------------------------------------------------+
|temp_alias                                                                                            |
+------------------------------------------------------------------------------------------------------+
|{[entityI, entityII, entityIII], start, [property1, property2, property3], 2022-05-01}                |
|{[entityW, entityX, entityY, entityZ], stop, [propertyA, propertyB, propertyC, propertyD], 2022-05-01}|
+------------------------------------------------------------------------------------------------------+

+----------+--------------------------------------------+------------------------------------+-------------------+
|event_name|event_properties                            |entities                            |event_timestamp    |
+----------+--------------------------------------------+------------------------------------+-------------------+
|start     |[property1, property2, property3]           |[entityI, entityII, entityIII]      |2022-05-01 00:00:00|
|stop      |[propertyA, propertyB, propertyC, propertyD]|[entityW, entityX, entityY, entityZ]|2022-05-01 00:00:00|
+----------+--------------------------------------------+------------------------------------+-------------------+

+----------+--------------------------------------------+------------------------------------+-------------------+----------------------------------------------------------------------------------------+
|event_name|event_properties                            |entities                            |event_timestamp    |mapped                                                                                  |
+----------+--------------------------------------------+------------------------------------+-------------------+----------------------------------------------------------------------------------------+
|start     |[property1, property2, property3]           |[entityI, entityII, entityIII]      |2022-05-01 00:00:00|{property1 -> entityI, property2 -> entityII, property3 -> entityIII}                   |
|stop      |[propertyA, propertyB, propertyC, propertyD]|[entityW, entityX, entityY, entityZ]|2022-05-01 00:00:00|{propertyA -> entityW, propertyB -> entityX, propertyC -> entityY, propertyD -> entityZ}|
+----------+--------------------------------------------+------------------------------------+-------------------+----------------------------------------------------------------------------------------+

+----------+----------------+---------+-------------------+
|event_name|event_properties|entities |event_timestamp    |
+----------+----------------+---------+-------------------+
|start     |property1       |entityI  |2022-05-01 00:00:00|
|start     |property2       |entityII |2022-05-01 00:00:00|
|start     |property3       |entityIII|2022-05-01 00:00:00|
|stop      |propertyA       |entityW  |2022-05-01 00:00:00|
|stop      |propertyB       |entityX  |2022-05-01 00:00:00|
|stop      |propertyC       |entityY  |2022-05-01 00:00:00|
|stop      |propertyD       |entityZ  |2022-05-01 00:00:00|
+----------+----------------+---------+-------------------+

相关问题