行值形式的JSON PySpark列名

w7t8yxp5  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(148)

我有以下JSON数据,我用df = spark.read.json('file')读取这些数据。我想在json中获取值“9ac 2b 5 fc-d2 c5 - 43 a8-a9 e6 - 244 f02 b 93997”,并创建一个列“CustomerId”。

{
  "Statements": {
    "9ac2b5fc-d2c5-43a8-a9e6-244f02b93997": {
      "Accounts": [
        {
          "Id": 12345678,
          "Institution": "Bank Name",
          "Name": "Savings Name",
          "AccountNumber": "00000000",
          "Bsb": "000000",
          "CurrentBalance": "0",
          "Available": "0",
          "AccountHolder": "A",
          "AccountAddress": null,
          "AccountType": "SAVINGS",
          "OpeningBalance": "0.0",
          "ClosingBalance": "0.0"
        }
      ]
    }
  }
}

目前已设法通过以下方式获得帐户部分

cols = ["id"]
df_statement = df.select("Statements.*").toDF(*cols)
df_statement = df_statement.withColumn("accounts", explode("id.Accounts"))
df_statement.select("accounts.Institution", "accounts.Bsb", "accounts.AccountNumber").show()

其返回

+------------+--------+-------------+
| Institution|   Bsb  |AccountNumber|
+------------+--------+-------------+
|  Bank Name | 000000 |    00000000 |

我想要它,所以它返回这样的东西。谢谢!

+--------------------------------------+-----------+------------+-------------+
|               CustomerId             |Institution|   Bsb      |AccountNumber|
+--------------------------------------+-----------+------------+-------------+
| 9ac2b5fc-d2c5-43a8-a9e6-244f02b93997 | Bank Name | 000000     |    00000000 |
2izufjch

2izufjch1#

我不确定这个解决方案是否适合你,但是请使用我自己的托管json文件和你提供的数据来检查它

import json
import requests
from urllib.request import urlopen

header= {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) ' 
      'AppleWebKit/537.11 (KHTML, like Gecko) '
      'Chrome/23.0.1271.64 Safari/537.11',
      'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
      'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
      'Accept-Encoding': 'none',
      'Accept-Language': 'en-US,en;q=0.8',
      'Connection': 'keep-alive'}

url = 'https://api.jsonbin.io/v3/qs/635c24e30e6a79321e378a29'
req = urllib.request.Request(url=url, headers=header) 

jsonData = urlopen(req).read().decode('utf-8')
rdd = spark.sparkContext.parallelize([jsonData])
df = spark.read.json(rdd)

df2 = df.select("record.Statements.*")
uuid = df2.columns[0]

# print(uuid)

df = (
    df
    .withColumn("accounts", F.explode(f"record.Statements.{uuid}.Accounts"))
    .withColumn("CustomerId", F.lit(uuid))
    .withColumn("Institution", F.col("accounts.Institution"))
    .withColumn("Bsb", F.col("accounts.Bsb"))
    .withColumn("AccountNumber", F.col("accounts.AccountNumber"))
    .drop("id")
    .drop("metadata")
    .drop("record")
    .drop("accounts")
)
df.show(truncate=False)

输出:

+------------------------------------+-----------+------+-------------+
|CustomerId                          |Institution|Bsb   |AccountNumber|
+------------------------------------+-----------+------+-------------+
|9ac2b5fc-d2c5-43a8-a9e6-244f02b93997|Bank Name  |000000|00000000     |
+------------------------------------+-----------+------+-------------+

相关问题