获取pyspark中缺少评级的评级列表的列,并将其设置为0

rsl1atfo  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(518)

我有一个如下所示的数据框,我想添加一列“ratings\u list”,它按id分组,并将评级放入一个列表中,列表索引是项目编号

id | item | rating
1  | 1    | 5
1  | 2    | 4
1  | 4    | 5
1  | 7    | 3
2  | 5    | 3
2  | 2    | 5
2  | 3    | 5

最理想的结果是

id | rating_list
1  | [5,4,0,5,0,0,3]
2  | [0,5,5,0,3,0,0]

其中,分级列表的长度是Dataframe中不同项目的数量。到目前为止,我有一个带有项目列表和评级列表的数据框,但我不确定这是否是合适的中间步骤

id | item_list | rating_list
1  | [1,2,4,7] | [5,4,5,3]
2  | [2,3,5]   | [5,5,3]

这将是一个巨大的Dataframe,所以我更喜欢更快的东西。

vaqhlq81

vaqhlq811#

你可以用自定义项来做这个。

from pyspark.sql.types import ArrayType,IntegerType
from pyspark.sql.functions import collect_list,col,create_map,udf,countDistinct,lit

# UDF

def get_rating_list(ratings_arr,num_items):
    ratings_list = [0]*num_items
    for map_elem in ratings_arr:
        for k,v in map_elem.items():
            ratings_list[k-1] = v
    return ratings_list

# 1.Create a new map column with item as key and rating as value

t1 = df.withColumn('item_rating_map',create_map(col('item'),col('rating')))

# 2.Group the dataframe on id and get all the maps per id into an array

grouped_df = t1.groupBy('id').agg(collect_list('item_rating_map').alias('item_ratings'))

# 3.udf object

rating_list_udf = udf(get_rating_list,ArrayType(IntegerType()))

# 4.Get the number of unique items

num_items = df.agg(countDistinct('item').alias('num_items')).collect()[0].num_items

# 5.Apply the udf

result = grouped_df.withColumn('rating_arr',rating_list_udf(col('item_ratings'),lit(num_items)))

# result.show(20,truncate=False)

您可能需要在udf中添加额外的逻辑来处理 n 唯一项,但有个项具有值 > n ,在这种情况下,您将获得 IndexError .

i2byvkas

i2byvkas2#

试试这个 Spark2.4+ 使用按文本分区的窗口将允许我们保持分区的加载状态,并在不执行collect操作的情况下计算max/min。

df.show() #sampledataframe

# +---+----+------+

# |id |item|rating|

# +---+----+------+

# |1  |1   |5     |

# |1  |2   |4     |

# |1  |4   |5     |

# |1  |7   |3     |

# |2  |5   |3     |

# |2  |2   |5     |

# |2  |3   |5     |

# +---+----+------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy(F.lit(0))

df.withColumn("items", F.sequence(F.lit(1),F.max("item").over(w),F.lit(1)))\
  .groupBy("id").agg(F.collect_list("item").alias("item"),F.collect_list("rating").alias("rating"),\
                     F.first("items").alias("items"))\
  .withColumn("rating",\
              F.sort_array(F.arrays_zip(F.flatten(F.array("item",F.array_except("items","item"))),"rating")))\
  .select("id",F.expr("""transform(rating.rating,x-> IF(x is null, 0,x))""").alias("rating_list")).show(truncate=False)

# +---+---------------------+

# |id |rating_list          |

# +---+---------------------+

# |1  |[5, 4, 0, 5, 0, 0, 3]|

# |2  |[0, 5, 5, 0, 3, 0, 0]|

# +---+---------------------+
7uzetpgm

7uzetpgm3#

下面是另一个基于观察的解决方案 max(item) == max_array_length ,如果假设无效,请告诉我。

from pyspark.sql.functions import expr, collect_list, min, max, sequence, lit

# max item implies max array length

maxi = df.select(max("item").alias("maxi")).first()["maxi"]

df = df.groupBy("id").agg( \
      collect_list("item").alias("items"),
      collect_list("rating").alias("ratings")
).withColumn("idx", sequence(lit(1), lit(maxi)))

# we are projecting an array[K] into array[N] where K <= N

rating_expr = expr("""transform(idx, i -> if(array_position(items, i) >= 1, 
                                                 ratings[array_position(items, i) - 1], 
                                                 0))""")

df.select(df.id, rating_expr.alias("rating_list"))

# +---+---------------------+

# |id |rating_list          |

# +---+---------------------+

# |1  |[5, 4, 0, 5, 0, 0, 3]|

# |2  |[0, 5, 5, 0, 3, 0, 0]|

# +---+---------------------+

分析:迭代 idx ,如果当前项即i存在于 items 使用其位置从具有 ratings[array_position(items, i) - 1] ,否则为0。

相关问题