使用pyspark分解json列

khbbv19g  于 2023-01-04  发布在  Spark
关注(0)|答案(2)|浏览(257)

我有 Dataframe 如下:

+-----------------------------------------------------------------------------------------------+-----------------------+
|value                                                                                          |timestamp              |
+-----------------------------------------------------------------------------------------------+-----------------------+
|{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}|2023-01-03 11:02:11.975|
|{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}  |2023-01-03 11:02:11.976|
|{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}       |2023-01-03 11:02:11.976|
|{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}} |2023-01-03 11:02:11.976|
+-----------------------------------------------------------------------------------------------+-----------------------+
root
 |-- value: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

使用pyspark的预期结果:

+---------+-------------+-------------+-----------------------+
id        | first_name  | last_name   | email                 |
+---------+-------------+-------------+-----------------------+
1001      | Sally       | Thomas      | sally.thomas@acme.com |
1002      | George      | Bailey      | gbailey@foobar.com    |
1003      | Edward      | Walker      | ed@walker.com         |
1004      | Anne        | Kretchmar   | annek@noanswer.org    |

如有任何帮助,

63lcw9qa

63lcw9qa1#

你可以使用pyspark.sql.functions中的from_json函数来完成这个操作。这个函数需要定义一个模式,你也可以创建一个模式。这是你的 Dataframe :

df = spark.createDataFrame(
    [
        ("""{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}""",
         "2023-01-03 11:02:11.975"),
        ("""{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}""",
         "2023-01-03 11:02:11.976"),
        ("""{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}""",
         "2023-01-03 11:02:11.976"),
        ("""{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}""",
         "2023-01-03 11:02:11.976"),
    ],
    ["value", "timestamp"]
)

一个可能的解决方案是这样的:

from pyspark.sql.types import IntegerType, StringType, StructType, StructField
from pyspark.sql.functions import from_json

schema = StructType([
    StructField("after", StructType([
        StructField("id", IntegerType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),
        StructField("email", StringType(), True),
    ]), True),
    StructField("timestamp", StringType(), True)
])

output = df.withColumn("value", from_json("value", schema)) \
           .select("value.after.*")

output.show()
+----+----------+---------+--------------------+
|  id|first_name|last_name|               email|
+----+----------+---------+--------------------+
|1001|     Sally|   Thomas|sally.thomas@acme...|
|1002|    George|   Bailey|  gbailey@foobar.com|
|1003|    Edward|   Walker|       ed@walker.com|
|1004|      Anne|Kretchmar|  annek@noanswer.org|
+----+----------+---------+--------------------+

最后一条select语句将嵌套结构展开为单独的字段。
使用不同的pyspark.sql.types,你可以创建任何类型的模式,接受字段是否为nullable,等等...更多关于数据类型的信息可以在here中找到。

idfiyjo8

idfiyjo82#

你可以使用pyspark的from_json函数来解析JSON字符串(s).该函数需要格式来解析.在你的情况下,你可以得到一个struct的struct.

data_sdf. \
    withColumn('parsed_json', 
               func.from_json('value', 
                              'after struct<id: bigint, first_name: string, last_name: string, email: string>'
                              )
               ). \
    withColumn('inner_struct', func.col('parsed_json.after')). \
    selectExpr('ts', 'inner_struct.*'). \
    show(truncate=False)

# +-----------------------+----+----------+---------+---------------------+
# |ts                     |id  |first_name|last_name|email                |
# +-----------------------+----+----------+---------+---------------------+
# |2023-01-03 11:02:11.975|1001|Sally     |Thomas   |sally.thomas@acme.com|
# |2023-01-03 11:02:11.976|1002|George    |Bailey   |gbailey@foobar.com   |
# |2023-01-03 11:02:11.976|1003|Edward    |Walker   |ed@walker.com        |
# |2023-01-03 11:02:11.976|1004|Anne      |Kretchmar|annek@noanswer.org   |
# +-----------------------+----+----------+---------+---------------------+

解析后的数据如下所示

data_sdf. \
    withColumn('parsed_json', 
               func.from_json('value', 
                              'after struct<id: bigint, first_name: string, last_name: string, email: string>'
                              )
               ). \
    withColumn('inner_struct', func.col('parsed_json.after')). \
    show(truncate=False)

# +-----------------------------------------------------------------------------------------------+-----------------------+----------------------------------------------+--------------------------------------------+
# |value                                                                                          |ts                     |parsed_json                                   |inner_struct                                |
# +-----------------------------------------------------------------------------------------------+-----------------------+----------------------------------------------+--------------------------------------------+
# |{"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}|2023-01-03 11:02:11.975|{{1001, Sally, Thomas, sally.thomas@acme.com}}|{1001, Sally, Thomas, sally.thomas@acme.com}|
# |{"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}  |2023-01-03 11:02:11.976|{{1002, George, Bailey, gbailey@foobar.com}}  |{1002, George, Bailey, gbailey@foobar.com}  |
# |{"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}       |2023-01-03 11:02:11.976|{{1003, Edward, Walker, ed@walker.com}}       |{1003, Edward, Walker, ed@walker.com}       |
# |{"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}} |2023-01-03 11:02:11.976|{{1004, Anne, Kretchmar, annek@noanswer.org}} |{1004, Anne, Kretchmar, annek@noanswer.org} |
# +-----------------------------------------------------------------------------------------------+-----------------------+----------------------------------------------+--------------------------------------------+

# root
#  |-- value: string (nullable = true)
#  |-- ts: string (nullable = true)
#  |-- parsed_json: struct (nullable = true)
#  |    |-- after: struct (nullable = true)
#  |    |    |-- id: long (nullable = true)
#  |    |    |-- first_name: string (nullable = true)
#  |    |    |-- last_name: string (nullable = true)
#  |    |    |-- email: string (nullable = true)
#  |-- inner_struct: struct (nullable = true)
#  |    |-- id: long (nullable = true)
#  |    |-- first_name: string (nullable = true)
#  |    |-- last_name: string (nullable = true)
#  |    |-- email: string (nullable = true)

相关问题