我知道collect_set可以有随机的顺序。有没有不同的方法来排序collect_set的计数?我想有一个数组的最流行的项目为一个单一的列根据一个分组的一个单独的id列。你会有一个collect_list,然后运行计数?
6yt4nkrj1#
不,没有按计数对collect_set进行排序的方法,因为collect aggregate方法不对项目进行计数,因此没有可用于对项目进行排序的信息。因此,由于Spark 3.1及更高版本,并且给定一个dataframe,其中包含两列id和item,您可以:1.对列id和items的groupBy执行count1.将(count, item)对收集到具有collect_list和struct的数组中。* 注意:您可以在这里使用collect_set来代替collect_list,但这是无用的,因为我们确信(count, item)的每个元素都是唯一的 *1.使用sort_array按降序计数对数组进行排序1.用transformMap数组以删除count。其可以被翻译成如下代码:
collect_set
dataframe
id
item
items
count
(count, item)
collect_list
struct
sort_array
transform
from pyspark.sql import functions as F final_df = dataframe.groupBy('id', 'item').count() \ .groupBy('id') \ .agg( F.transform( F.sort_array( F.collect_list(F.struct("count", "item")), asc=False ), lambda x: x.getItem('item') ).alias('popular_items') )
withColumn
from pyspark.sql import functions as F final_df = dataframe.groupBy('id', 'item').count() \ .groupBy('id') \ .agg(F.sort_array(F.collect_list(F.struct("count", "item")), asc=False).alias('popular_items')) \ .withColumn("popular_items", F.col('popular_items.item'))
使用以下输入 Dataframe :
+---+-----+ |id |item | +---+-----+ |1 |item1| |1 |item2| |1 |item2| |1 |item2| |1 |item3| |2 |item3| |2 |item3| |2 |item1| |3 |item1| |3 |item1| +---+-----+
您会得到以下输出:
+---+---------------------+ |id |popular_items | +---+---------------------+ |1 |[item2, item3, item1]| |3 |[item1] | |2 |[item3, item1] | +---+---------------------+
nmpmafwu2#
如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保存重复值。
from collections import Counter from pyspark.sql import SparkSession spark = SparkSession.builder.enableHiveSupport().getOrCreate() def elem_cnt(arr): return ['{}({})'.format(*i) for i in Counter(arr).most_common()] spark.udf.register('elem_cnt_udf', elem_cnt) data = [ ('AC Milan', 'Ronaldo Luiz'), ('AC Milan', 'Paolo Maldini'), ('AC Milan', 'Kaká'), ('AC Milan', 'Ronaldo Luiz'), ('AC Milan', 'Andriy Shevchenko'), ('AC Milan', 'Van Basten'), ('AC Milan', 'Ronaldo Luiz'), ('AC Milan', 'Andriy Shevchenko'), ('AC Milan', 'Van Basten'), ('Milan', 'Ronaldo Luiz'), ('Milan', 'Paolo Maldini'), ('Milan', 'Ronaldo Luiz'), ('Milan', 'Van Basten') ] schema = """ id string,name string """ df = spark.createDataFrame(data, schema) df.createOrReplaceTempView('tmp') rank_sql = """ select id,elem_cnt_udf(collect_list(name)) rank from tmp group by id """ rank_df = spark.sql(rank_sql) rank_df.show(truncate=False)
2条答案
按热度按时间6yt4nkrj1#
不,没有按计数对
collect_set
进行排序的方法,因为collect aggregate方法不对项目进行计数,因此没有可用于对项目进行排序的信息。因此,由于Spark 3.1及更高版本,并且给定一个
dataframe
,其中包含两列id
和item
,您可以:1.对列
id
和items
的groupBy执行count
1.将
(count, item)
对收集到具有collect_list
和struct
的数组中。* 注意:您可以在这里使用collect_set
来代替collect_list
,但这是无用的,因为我们确信(count, item)
的每个元素都是唯一的 *1.使用
sort_array
按降序计数对数组进行排序1.用
transform
Map数组以删除count
。其可以被翻译成如下代码:
transform
替换为withColumn
,如下所示:*示例
使用以下输入 Dataframe :
您会得到以下输出:
nmpmafwu2#
如果我没理解错的话,你想做人气排名analysis.you需要使用collect_list来保存重复值。