pyspark 创建一个通用函数来转置列,并将字典作为另一列中的值

tv6aics1  于 12个月前  发布在  Spark
关注(0)|答案(1)|浏览(75)

我有这个Spark数据框。我试图将列转换为行,并使其中一列gw是另一列字典的键。我想写一个通用的函数,它可以接受多个列作为输入。

+-----+-----+-----+-------+
|  gw |rrc |re_est|
+-----+-------------------+
|  210.142.27.137  |  1400.0|  26.0|  
|   210.142.27.202|  2300|  12 |

预期输出如下:

+-----+-----------+-
| index    |gw_mapping|
+-----+------
|  rrc  | {210.142.27.137:1400.0, 210.142.27.202: 2300}|
|  re_est |  {10.142.27.137:26.0, 210.142.27.202:12 }|

我所做的:

result_df = (
df
.select('gw', F.expr("stack(2, 'rrc', rrc, 're_est', re_est)  AS (index, value)"))
.groupby('index')
.agg(F.expr("map_from_entries(collect_list(struct(gw, value))) as gw_mapping")))

如果我编写一个可以接受多个列的通用函数(如rrc,re_est),

yqhsw0fo

yqhsw0fo1#

要生成解决方案,可以创建动态堆栈表达式

cols = ['rrc', 're_est']
expr = f"stack({len(cols)}, %s) AS (index, value)" % ', '.join(f"'{c}', {c}" for c in cols)
result_df = (
    df
    .select('gw', F.expr(expr))
    .groupBy('index')
    .agg(F.expr("map_from_entries(collect_list(struct(gw, value))) AS gw_mapping"))
)
result_df.show()

+------+----------------------------------------------------+
|index |gw_mapping                                          |
+------+----------------------------------------------------+
|re_est|{210.142.27.137 -> 26.0, 210.142.27.202 -> 12.0}    |
|rrc   |{210.142.27.137 -> 1400.0, 210.142.27.202 -> 2300.0}|
+------+----------------------------------------------------+

相关问题