我正在尝试将下面的 Dataframe 转换为嵌套的json并写入json文件。
| Trans_Type |Payment_Due_Dt|BILL_CYCLE_ID|BALANCE_AMT|
+--------------------+--------------+-------------+-----------+
| New Transaction | 26-01-2015| 31-12-2014| 61.20|
|Current Transactions| 26-01-2015| 31-12-2014| 289.02|
| Age Transaction | | | 0.00|
|Clear Transaction | | | 0.00|
| Remittances | | | -61.20|
+--------------------+--------------+-------------+-----------+
spark_df.printSchema()
root
|-- Trans_Type: string (nullable = false)
|-- Payment_Due_Dt: string (nullable = true)
|-- BILL_CYCLE_ID: string (nullable = true)
|-- BALANCE_AMT: decimal(25,2) (nullable = false)
帐户余额的计算公式为:总和(余额金额)= 647.00预期Json O/p -
{
"Summary": [{
"Age Transactions": {
"Payment_Due_Dt": " ",
"BILL_CYCLE_ID": " ",
"BALANCE_AMT": 0.00
}
},
{
"Clear Transactions": {
"Payment_Due_Dt": " ",
"BILL_CYCLE_ID": " ",
"BALANCE_AMT": 0.00
}
},
{
"Current Transactions": {
"Payment_Due_Dt": "26-01-2015",
"BILL_CYCLE_ID": "31-12-2014",
"BALANCE_AMT": 289.02
}
},
{
"New Transactions": {
"Payment_Due_Dt": "26-01-2015",
"BILL_CYCLE_ID": "31-12-2014",
"BALANCE_AMT": 61.20
}
},
{
"Remittances" : {
"BALANCE_AMT": 61.20
}
},
{
"Account Balance": {
"BAL_AM": "647.00"
}
}
]
}
使用此代码片段,我无法获得所需的o/p
list1 = [Txn_Sum_base2_DF["Payment_Due_Dt"], Txn_Sum_base2_DF["BILL_CYCLE_ID"], Txn_Sum_base2_DF["BALANCE_AMT"] ]
df = Txn_Sum_base2_DF.groupBy("Trans_Type") \
.agg(collect_list(struct(list1)).alias("summary")).toJSON().collect()
上面的代码给了我json结构,但是它与result不匹配。
1条答案
按热度按时间5lhxktic1#
我能够做这一点不同.首先,我为属性创建struct,然后透视他们与事务类型作为列.然后,我创建一个struct与所有事务,这将是
summary
字段.这account_balance
是一个总和(如问题中所述),因此我创建了一个包含总和的 Dataframe 并将其加入(* 这也可以使用aggregate
和transform
* 完成)。以下是使用示例数据的方法 (请根据您的规格进行调整)
现在,有2个选项.你可以用json字符串创建一个字段,使用
to_json()
/toJSON()
或者把summary
和acc_bal
字段写为json.