如何在pyspark中动态地将一列转换成多行?

z9smfwbn  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(355)

我有一个如下所示的Dataframe
colnamedefghijkl列名
基于一个特定的参数,我想把这些值转换成行。例如,如果参数值为3,那么新的dataframe如下所示
COL2COL3ABCDEFGHIJKL公司
但是,如果参数值为4,则如下所示
冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷空气冷
注意以下几点:
列名并不重要
单列中的项数和参数都可以更改
你知道如何在Pypark中实现这一点吗?提前谢谢。

4smxwvx5

4smxwvx51#

您可以添加一些辅助列来透视Dataframe:

import pyspark.sql.functions as F

x = 3

result = df.withColumn(
    'id',
    F.monotonically_increasing_id()
).withColumn(
    'id2',
    (F.col('id') / x).cast('int')
).withColumn(
    'id3',
    F.col('id') % x
).groupBy('id2').pivot('id3').agg(F.first('ColName')).orderBy('id2').drop('id2')

result.show()
+---+---+---+
|  0|  1|  2|
+---+---+---+
|  a|  b|  c|
|  d|  e|  f|
|  g|  h|  i|
|  j|  k|  l|
+---+---+---+
wlzqhblo

wlzqhblo2#

你可以使用 collect_list 以及 row_number 为了达到这个目的。
第一步:生成一个自定义的行号。

from pyspark.sql.functions import floor, row_number, collect_list
from pyspark.sql.window import Window

no_of_columns = 3

df2 = df.withColumn("row_num", floor((row_number().over(Window.orderBy("ColName"))-1)/no_of_columns))

第二步:用这个行号将数据分组并使用 collect_list 创建列表。

df3 = df2.groupBy("row_num").agg(collect_list("ColName").alias("col_list"))

步骤3:使用python的列表理解从这个列表中选择所有元素。

df3.select(*[df3.col_list[i].alias(f"col{i+1}") for i in range(no_of_columns)]).show()

输出:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
|   g|   h|   i|
|   j|   k|   l|
+----+----+----+

注:参数 no_of_columns 可以根据所需的输出列数进行更改。

相关问题