我必须将spark Dataframe 转换为嵌套的json结构

oxiaedzo  于 2023-03-09  发布在  Spark
关注(0)|答案(1)|浏览(130)

我正在尝试将下面的 Dataframe 转换为嵌套的json并写入json文件。

|       Trans_Type   |Payment_Due_Dt|BILL_CYCLE_ID|BALANCE_AMT|
+--------------------+--------------+-------------+-----------+
|  New Transaction   |    26-01-2015|   31-12-2014|      61.20|
|Current Transactions|    26-01-2015|   31-12-2014|     289.02|
|  Age Transaction   |              |             |       0.00|
|Clear Transaction   |              |             |       0.00|
|      Remittances   |              |             |     -61.20|
+--------------------+--------------+-------------+-----------+
spark_df.printSchema()
root
 |-- Trans_Type: string (nullable = false)
 |-- Payment_Due_Dt: string (nullable = true)
 |-- BILL_CYCLE_ID: string (nullable = true)
 |-- BALANCE_AMT: decimal(25,2) (nullable = false)

帐户余额的计算公式为:总和(余额金额)= 647.00预期Json O/p -

{
    "Summary": [{
            "Age Transactions": {
                "Payment_Due_Dt": " ",
                "BILL_CYCLE_ID": " ",
                "BALANCE_AMT": 0.00
            }
        },
        {
            "Clear Transactions": {
                "Payment_Due_Dt": " ",
                "BILL_CYCLE_ID": " ",
                "BALANCE_AMT": 0.00
            }
        },
        {
            "Current Transactions": {
                "Payment_Due_Dt": "26-01-2015",
                "BILL_CYCLE_ID": "31-12-2014",
                "BALANCE_AMT": 289.02
            }
        },
        {
            "New Transactions": {
                "Payment_Due_Dt": "26-01-2015",
                "BILL_CYCLE_ID": "31-12-2014",
                "BALANCE_AMT": 61.20
            }   
        },
        {
            "Remittances" : {
                "BALANCE_AMT": 61.20
            }
            
        },
        {
            "Account Balance": {
            "BAL_AM": "647.00"
            }
        }
    ]
}

使用此代码片段,我无法获得所需的o/p

list1 = [Txn_Sum_base2_DF["Payment_Due_Dt"], Txn_Sum_base2_DF["BILL_CYCLE_ID"], Txn_Sum_base2_DF["BALANCE_AMT"] ]
df = Txn_Sum_base2_DF.groupBy("Trans_Type") \
            .agg(collect_list(struct(list1)).alias("summary")).toJSON().collect()

上面的代码给了我json结构,但是它与result不匹配。

5lhxktic

5lhxktic1#

我能够做这一点不同.首先,我为属性创建struct,然后透视他们与事务类型作为列.然后,我创建一个struct与所有事务,这将是summary字段.这account_balance是一个总和(如问题中所述),因此我创建了一个包含总和的 Dataframe 并将其加入(* 这也可以使用aggregatetransform * 完成)。
以下是使用示例数据的方法 (请根据您的规格进行调整)

data1_sdf = data_sdf. \
    withColumn('attr', func.struct('pay_due_dt', 'bill_cycle_id', 'bal_amt')). \
    groupBy(func.lit(1).alias('dropme')). \
    pivot('trans_type'). \
    agg(func.first('attr'))

# +------+-----------------+-----------------+--------------------------------+------------------------------+-------------------+
# |dropme|Age Transaction  |Clear Transaction|Current Transactions            |New Transaction               |Remittances        |
# +------+-----------------+-----------------+--------------------------------+------------------------------+-------------------+
# |1     |{null, null, 0.0}|{null, null, 0.0}|{26-01-2015, 31-12-2014, 289.02}|{26-01-2015, 31-12-2014, 61.2}|{null, null, -61.2}|
# +------+-----------------+-----------------+--------------------------------+------------------------------+-------------------+

# calculate account balance
data2_sdf = data_sdf. \
    select(func.sum('bal_amt').alias('acc_bal'), func.lit(1).alias('dropme'))

# +-------+------+
# |acc_bal|dropme|
# +-------+------+
# | 289.02|     1|
# +-------+------+

# create final source for json
fnl_data_sdf = data1_sdf. \
    join(data2_sdf, ['dropme'], 'left'). \
    drop('dropme'). \
    withColumn('summary', func.struct(*data1_sdf.drop('dropme').columns))

现在,有2个选项.你可以用json字符串创建一个字段,使用to_json()/toJSON()或者把summaryacc_bal字段写为json.

# using toJSON() to create json string
fnl_data_sdf. \
    select('summary', 'acc_bal'). \
    toJSON(). \
    collect()[0]

# '{"summary":{"Age Transaction":{"bal_amt":0.0},"Clear Transaction":{"bal_amt":0.0},"Current Transactions":{"pay_due_dt":"26-01-2015","bill_cycle_id":"31-12-2014","bal_amt":289.02},"New Transaction":{"pay_due_dt":"26-01-2015","bill_cycle_id":"31-12-2014","bal_amt":61.2},"Remittances":{"bal_amt":-61.2}},"acc_bal":289.02}'

# writing json files
fnl_data_sdf. \
    select('summary', 'acc_bal'). \
    coalesce(1). \
    write.json('/content/drive/MyDrive/json_output', mode='overwrite')

相关问题