pyspark获取structtype输出,用于从列表计算分布

sy5wg1nm  于 2021-07-13  发布在  Spark
关注(0)|答案(2)|浏览(267)

问题是:

我有一个列,其中有一个冗余值列表,需要在pysparkDataframe的新列中将其转换为类似字典的格式。
场景:
这是我的PyparkDataframe:
acall大学10rdk[1,1,1,2,2]10usw[1,2,2,2,2]8rdk[21,21,21,20]8rdj[20,20,21]10rdk[45,45,45,45,2,2,2]7ssw[6,6,6,19,19]
这是我用下面一行创建的:

my_df.groupBy(['A', 'C']).agg(collect_list("Class").alias("all_classes"))

现在我需要一个新的列来整理数据,如下所示:
考虑上一列中第一行的数据: [1, 1, 1, 2, 2] 需要转化为 {1: 3, 2: 2} 基本上就是数字本身和它被复制的次数。

我的尝试:

因为我对python很在行,所以我选择这样写一个udf:

custom_collect_function = udf(lambda li: {k: li.count(k) for k in set(li)}, StructType(li, ArrayType(elementType=IntegerType()), False))

display(my_df.withColumn("Distribution", custom_collect_function(my_df.all_classes)))

我显然没有在这里设置structtype,我欢迎任何其他/优化的方法。我也欢迎任何星火燎原的方法。
预期输出如下:
acdistributionall\U类10rdk{1:3,2:2}[1,1,1,2,2]10usw{1:1,2:6}[1,2,2,2,2]8rdk{21:4,20:1}[21,21,21,20]8rdj{20:2,21:1}[20,20,21]10rdk{45:4,2:3}[45,45,45,45,2,2,2]7ssw{6:3,19:2}[6,6,19,19]

fquxozlt

fquxozlt1#

实际上,在创建列表时,最好在第一个groupby中这样做 all_classes 从原来的 my_df :

from pyspark.sql import Window
from pyspark.sql import functions as F

df1 = df.withColumn("cnt", F.count("*").over(Window.partitionBy("A", "C", "Class"))) \
    .groupBy("A", "C") \
    .agg(
    F.map_from_entries(
        F.collect_set(F.struct(F.col("Class"), F.col("cnt")))
    ).alias("Distribution"),
    F.collect_list("Class").alias("all_classes"),
)

df1.show(truncate=False)

# +---+---+-------------------------+----------------------------------------+

# |A  |C  |Distribution             |all_classes                             |

# +---+---+-------------------------+----------------------------------------+

# |7  |SSW|[6 -> 3, 19 -> 2]        |[19, 19, 6, 6, 6]                       |

# |8  |RDK|[20 -> 1, 21 -> 4]       |[20, 21, 21, 21, 21]                    |

# |10 |RDK|[2 -> 5, 1 -> 3, 45 -> 4]|[2, 2, 2, 2, 2, 1, 1, 1, 45, 45, 45, 45]|

# |8  |RDJ|[20 -> 2, 21 -> 1]       |[21, 20, 20]                            |

# |10 |USW|[2 -> 6, 1 -> 1]         |[1, 2, 2, 2, 2, 2, 2]                   |

# +---+---+-------------------------+----------------------------------------+

在这里,我们使用一个窗口统计每个类的出现次数,然后按 A 以及 C 创造 all_classes 就像你已经做过的,但也创造了 Distribution 作为count列和类的Map列,使用 map_from_entries 功能。您可以稍后使用 to_json 如果需要json字符串,请单击map列。

xwbd5t1u

xwbd5t1u2#

你可以试试这个:

from pyspark.sql import functions as F, Window

result = my_df.withColumn(
    'all_classes',
    F.collect_list('Class').over(Window.partitionBy('A', 'C'))
).groupBy(
    'A', 'C', 'all_classes', 'Class'
).agg(
    F.count('Class').alias('cnt_class')
).groupBy(
    'A', 'C', 'all_classes'
).agg(
    F.map_from_entries(
        F.collect_list(F.struct('Class', 'cnt_class'))
    ).alias('distribution')
)

result.show(truncate=False)
+---+---+----------------------------------------+-------------------------+
|A  |C  |all_classes                             |distribution             |
+---+---+----------------------------------------+-------------------------+
|7  |SSW|[6, 6, 6, 19, 19]                       |[6 -> 3, 19 -> 2]        |
|8  |RDK|[21, 21, 21, 21, 20]                    |[21 -> 4, 20 -> 1]       |
|10 |RDK|[1, 1, 1, 2, 2, 45, 45, 45, 45, 2, 2, 2]|[1 -> 3, 2 -> 5, 45 -> 4]|
|8  |RDJ|[20, 20, 21]                            |[20 -> 2, 21 -> 1]       |
|10 |USW|[1, 2, 2, 2, 2, 2, 2]                   |[1 -> 1, 2 -> 6]         |
+---+---+----------------------------------------+-------------------------+

结果有点不同,因为分组的Dataframe是由重复的行组成的 (A, C) = (10, RDK) . 还要注意,我使用了maptype。无法获取structtype,因为结构必须在给定的列中具有相同的字段,在本例中不是这样。

相关问题