两次分组和聚合

bf1o4zei  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(469)

我有这样一个Dataframe:

City     State     Hour   Score     Percentage
DEN      CO        1      0         0
DEN      CO        1      0         0
DEN      CO        2      2         99
DEN      CO        3      0         0
NYC      NYC       1      0         0

我希望它看起来像这样:

City     State     total_hours  total_scores  total_perct.   total_volume      
DEN      CO        [1,2,3]      [0,2,0]       [0,99,0]       [2,1,1]
NYC      NYC       [1]          [0]           [0]            [1]

为了 total_hours 我只是在做一个 collect_set 对于 City 以及 State 对于 total_scores 我为每个特定的小时做一个集合,然后收集所有小时的所有分数。例:第一个小时有两个分数,0和0,我只取其中一个,然后第二个小时有1,所以它变成了 [0,2] . 同样的事情 total_perct. 为了 total_volume ,我记下了每小时的计数 collect_list 在同一个城市和州的所有时间。
这基本上就是我想要达到的目标。如果我做一个 groupBy 像这样:

df.groupBy("city", "state", "hour")
  .agg(collect_set("Hour").alias("total_hours"), collect_set("Score").alias("total_scores"), 
       collect_set("Percentage").alias("total_perct."), count("hour").alias("total_volume"))

我将获得以下Dataframe:

City     State     total_hours  total_scores  total_perct.   total_volume 
DEN      CO        [1]          [0]           [0]            2
DEN      CO        [2]          [2]           [99]           1
DEN      CO        [3]          [0]           [0]            1
NYC      NYC       [1]          [0]           [0]            1

我不明白接下来该怎么办。我怎样才能得到我现在所拥有的并获得最终的结果呢?我用的是Pypark。

yduiuuwa

yduiuuwa1#

Spark<2.4
需要使用 udf 但在这种情况下速度很慢(

import itertools
from pyspark.sql.functions import max, count, col, collect_list, collect_set, udf
from pyspark.sql.types import ArrayType, IntegerType

@udf
def flatten(col):
    return list(itertools.chain.from_iterable(col))

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), collect_list(col('Score')).alias('total_scores'), collect_list(col('Percentage')).alias('total_perct'), collect_list(col('total_volume')).alias('total_volume')) \
  .select('City', 'State', 'total_hours', flatten(col('total_scores')), flatten(col('total_perct')), 'total_volume') \
  .show(10, False)

Spark2.4+
好的,这是我的工作 collect_set 以及 collect_list .

from pyspark.sql.functions import max, count, col, collect_list, flatten

df.groupBy('City', 'State', 'Hour') \
  .agg(collect_set(col('Score')).alias('Score'), collect_set(col('Percentage')).alias('Percentage'), count(col('Hour')).alias('total_volume')) \
  .orderBy('City', 'State', 'Hour') \
  .groupBy('City', 'State') \
  .agg(collect_list(col('Hour')).alias('total_hours'), flatten(collect_list(col('Score'))).alias('total_scores'), flatten(collect_list(col('Percentage'))).alias('total_perct.'), collect_list(col('total_volume')).alias('total_volume')) \
  .show(10, False)

+----+-----+-----------+------------+------------+------------+
|City|State|total_hours|total_scores|total_perct.|total_volume|
+----+-----+-----------+------------+------------+------------+
|NYC |NYC  |[1]        |[0]         |[0]         |[1]         |
|DEN |CO   |[1, 2, 3]  |[0, 2, 0]   |[0, 99, 0]  |[2, 1, 1]   |
+----+-----+-----------+------------+------------+------------+

如果你不把 orderBy 在此步骤中,结果列表的顺序将是混合的。

cdmah0mi

cdmah0mi2#

另一种方法是使用一个窗口计算小时的出现次数,然后根据分区筛选1个索引(idx),然后使用groupby+collect\u list

import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy("City","State","Hour")
l = ['Hour','Score', 'Percentage', 'Volume']

(df.withColumn("idx",F.monotonically_increasing_id()).select("*",
            F.count("Hour").over(w).alias("Volume"),F.max("idx").over(w).alias("Indx"))
            .filter(F.col("idx")==F.col("Indx")).orderBy("idx").groupBy("City","State")
            .agg(*[F.collect_list(i).alias(f"total_{i}") for i in l])).show()

输出:

+----+-----+----------+-----------+----------------+------------+
|City|State|total_Hour|total_Score|total_Percentage|total_Volume|
+----+-----+----------+-----------+----------------+------------+
| NYC|  NYC|       [1]|        [0]|             [0]|         [1]|
| DEN|   CO| [1, 2, 3]|  [0, 2, 0]|      [0, 99, 0]|   [2, 1, 1]|
+----+-----+----------+-----------+----------------+------------+

相关问题