pyspark 按计数对collect_set排序

crcmnpdw  于 2022-12-11  发布在  Spark
关注(0)|答案(2)|浏览(333)

我知道collect_set可以有随机的顺序。有没有不同的方法来排序collect_set的计数?我想有一个数组的最流行的项目为一个单一的列根据一个分组的一个单独的id列。你会有一个collect_list,然后运行计数?

6yt4nkrj

6yt4nkrj1#

不,没有按计数对collect_set进行排序的方法,因为collect aggregate方法不对项目进行计数,因此没有可用于对项目进行排序的信息。
因此,由于Spark 3.1及更高版本,并且给定一个dataframe,其中包含两列iditem,您可以:
1.对列iditems的groupBy执行count
1.将(count, item)对收集到具有collect_liststruct的数组中。* 注意:您可以在这里使用collect_set来代替collect_list,但这是无用的,因为我们确信(count, item)的每个元素都是唯一的 *
1.使用sort_array按降序计数对数组进行排序
1.用transformMap数组以删除count
其可以被翻译成如下代码:

from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
  .groupBy('id') \
  .agg(
    F.transform(
      F.sort_array(
        F.collect_list(F.struct("count", "item")),
        asc=False
      ),
      lambda x: x.getItem('item')
    ).alias('popular_items')
  )
  • 注意:如果您的spark版本低于3.1但高于1.6,您可以将transform替换为withColumn,如下所示:*
from pyspark.sql import functions as F

final_df = dataframe.groupBy('id', 'item').count() \
  .groupBy('id') \
  .agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \
  .withColumn("popular_items", F.col('popular_items.item'))

示例

使用以下输入 Dataframe :

+---+-----+
|id |item |
+---+-----+
|1  |item1|
|1  |item2|
|1  |item2|
|1  |item2|
|1  |item3|
|2  |item3|
|2  |item3|
|2  |item1|
|3  |item1|
|3  |item1|
+---+-----+

您会得到以下输出:

+---+---------------------+
|id |popular_items        |
+---+---------------------+
|1  |[item2, item3, item1]|
|3  |[item1]              |
|2  |[item3, item1]       |
+---+---------------------+
nmpmafwu

nmpmafwu2#

如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保存重复值。

from collections import Counter
from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

def elem_cnt(arr):
    return ['{}({})'.format(*i) for i in Counter(arr).most_common()]

spark.udf.register('elem_cnt_udf', elem_cnt)

data = [
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Paolo Maldini'),
    ('AC Milan', 'Kaká'),
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Andriy Shevchenko'),
    ('AC Milan', 'Van Basten'),
    ('AC Milan', 'Ronaldo Luiz'),
    ('AC Milan', 'Andriy Shevchenko'),
    ('AC Milan', 'Van Basten'),
    ('Milan', 'Ronaldo Luiz'),
    ('Milan', 'Paolo Maldini'),
    ('Milan', 'Ronaldo Luiz'),
    ('Milan', 'Van Basten')
]
schema = """
    id string,name string
"""
df = spark.createDataFrame(data, schema)
df.createOrReplaceTempView('tmp')
rank_sql = """
    select id,elem_cnt_udf(collect_list(name)) rank from tmp
    group by id
"""
rank_df = spark.sql(rank_sql)
rank_df.show(truncate=False)

相关问题