使用PySpark创建深度嵌套JSON

vaqhlq81  于 2023-05-08  发布在  Spark
关注(0)|答案(1)|浏览(190)

我正在尝试从dataframe创建一个嵌套的json文件。但我不能把它变成Pandas。和数组的值不应重复。
样本数据:

|PROD_ID|PROD_NAME|USAGE         |TYPE_PROD    |PART_ID|MATERIAL    |GRADE|MACHINE_NAME|DATE_TIME |PROCESS |LENGTH_S|WIDTH|LOCATION_ON_TOOL|PROCESS_NAME|MATERIAL_TEMPERATURE|
+-------+---------+--------------+-------------+-------+------------+-----+------------+----------+--------+--------+-----+----------------+------------+--------------------+
|1      |SHOE     |TO WALK AROUND|DAILY TRAINER|AA     |CARBON PLATE|A    |ABC123      |2020-05-28|ROLLING |17      |10   |[-1, -1]        |ROLLING     |201                 |
|1      |SHOE     |TO WALK AROUND|DAILY TRAINER|AA     |CARBON PLATE|A    |ABC123      |2020-05-28|STAMPING|17      |10   |[20, 3]         |STAMPING    |301                 |
|1      |SHOE     |TO WALK AROUND|DAILY TRAINER|AA     |CARBON PLATE|A    |ABC123      |2020-05-28|CASTING |17      |10   |[20, 3]         |CASTING     |900                 |
|1      |SHOE     |TO WALK AROUND|DAILY TRAINER|AA     |CARBON PLATE|A    |ABC123      |2020-05-28|CASTING |17      |10   |[20, 3]         |CASTING     |532                 |
|1      |SHOE     |TO WALK AROUND|DAILY TRAINER|AA     |CARBON PLATE|A    |ABC123      |2020-05-28|CASTING |17      |10   |[90, 100]       |CASTING     |532                 |
+-------+---------+--------------+-------------+-------+------------+-----+------------+----------+--------+--------+-----+----------------+------------+--------------------+

输出预期:

{
    "PRODS": {
        "Prod_ID": "1",
        "Prod_Name": "SHOE",
        "Usage": "TO_WALK_AROUND",
        "PART_ID": [
            {
                "Part_ID":"AA",
                "Material":"CARBON PLATE",
                "Grade":"A",
                "MANUFACTURE": [
                    {
                        "Machine_Name": "ABC123",
                        "Date_Time": "2020-05-28",
                        "Process": "Rolling"
                    },
                    {
                        "Machine_Name": "ABC123",
                        "Date_Time": "2020-05-28",
                        "Process": "Stamping"
                    },
                    {
                        "Machine_Name": "ABC123",
                        "Date_Time": "2020-05-28",
                        "Process": "Casting"
                    }
                ],
                "DIMENSION": [
                    {
                        "Length":17,
                        "Width": 10,
                    }
                ],
                "PARAMETER": [
                    {
                        "Location_On_Tool": [-1,-1],
                        "MEASUREMENT_DATA": [
                            {
                                "Process_Name": "Rolling",
                                "Material_Temperature": [201]
                            }
                        ]
                    },
                    {
                        "Location_On_Tool": [20,3],
                        "MEASUREMENT_DATA": [
                            {
                                "Process_Name": "Stamping",
                                "Material_Temperature": [301]
                            }.
                            {
                                "Process_Name": "Casting",
                                "Material_Temperature": [532]
                            }
                        ]
                    },
                    {
                        "Location_On_Tool": [90,100],
                        "MEASUREMENT_DATA": [
                            {
                                "Process_Name": "Casting",
                                "Material_Temperature": [532]
                            }
                        ]
                    }
                ],

            }
        ]
    }
}

正如你在json文件中看到的,array in MANUFACTURE,value for Machine_Name and Date_Time是相同的,但由于Process是不同的,它将在array中创建另一个元素。PARAMETER数组也是如此。
我尝试了collect_list, struct, agg, groupBy,但仍然没有得到我想要的确切的json。
P/s:I'll post my code that didn't work.

yqhsw0fo

yqhsw0fo1#

您可以使用以下代码创建所需的嵌套JSON结构:

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, struct

# Initialize Spark session
spark = SparkSession.builder.master("local").appName("Nested JSON").getOrCreate()

# Read input data
data = [
    (1, "SHOE", "TO WALK AROUND", "DAILY TRAINER", "AA", "CARBON PLATE", "A", "ABC123", "2020-05-28", "ROLLING", 17, 10, "[-1, -1]", "ROLLING", 201),
    # Add the rest of the data rows here
]

columns = ["PROD_ID", "PROD_NAME", "USAGE", "TYPE_PROD", "PART_ID", "MATERIAL", "GRADE", "MACHINE_NAME", "DATE_TIME", "PROCESS", "LENGTH_S", "WIDTH", "LOCATION_ON_TOOL", "PROCESS_NAME", "MATERIAL_TEMPERATURE"]

df = spark.createDataFrame(data, columns)

# Perform transformations and aggregations
df_agg = df.groupBy("PROD_ID", "PROD_NAME", "USAGE", "PART_ID", "MATERIAL", "GRADE").agg(
    collect_list(struct("MACHINE_NAME", "DATE_TIME", "PROCESS")).alias("MANUFACTURE"),
    collect_list(struct("LENGTH_S", "WIDTH")).alias("DIMENSION"),
    collect_list(struct("LOCATION_ON_TOOL", collect_list(struct("PROCESS_NAME", "MATERIAL_TEMPERATURE")).alias("MEASUREMENT_DATA"))).alias("PARAMETER")
)

# Final DataFrame
df_final = df_agg.select(
    struct("PROD_ID", "PROD_NAME", "USAGE", collect_list(struct("PART_ID", "MATERIAL", "GRADE", "MANUFACTURE", "DIMENSION", "PARAMETER")).alias("PART_ID")).alias("PRODS")
)

# Convert DataFrame to JSON
json_output = df_final.toJSON().collect()

# Print JSON output
print(json_output[0])

请标记我的答案为解决方案,如果它是你所寻找的😁

相关问题