python-3.x 有没有办法避免在这个Pyspark代码中使用'for'循环

lymnna71  于 2023-04-08  发布在  Python
关注(0)|答案(1)|浏览(94)

我想在数据库中查找每个类型的平均评分和该类型电影的总数。然而,数据库的组织方式如下:columns = [movieId,title,rating,genres],其中genres列是适用于电影的流派列表(最多6项)。我如何转换数据库,以便索引列是没有'for'循环的流派,因为此时我正在访问列表中的每个元素。
以下是我的代码:

import pyspark
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import max,split,avg,count,col,sum,concat_ws
spark = SparkSession.builder.appName("APISpark").getOrCreate()

ratings = spark.read.option("header","true").csv("input/ml25m/ratings.csv").drop("userId","timestamp")
movies = spark.read.option("header","true").csv("input/ml-25m/movies.csv")

movies = movies.withColumn('genre', split(movies['genres'], '\|') ).drop('genres')

A = movies.join(ratings,ratings["movieId"]==movies["movieId"]).drop('movieId')

F = A.groupBy(col("genre")[0]).agg(sum("rating").alias('s0'), count("title").alias('c0'))
F1 = A.groupBy(col("genre")[1]).agg(sum("rating").alias('s1'), count("title").alias('c1'))
F2 = A.groupBy(col("genre")[2]).agg(sum("rating").alias('s2'), count("title").alias('c2'))
F3 = A.groupBy(col("genre")[3]).agg(sum("rating").alias('s3'), count("title").alias('c3'))
F4 = A.groupBy(col("genre")[4]).agg(sum("rating").alias('s4'), count("title").alias('c4'))
F5 = A.groupBy(col("genre")[5]).agg(sum("rating").alias('s5'), count("title").alias('c5'))

F = F.join(F1,F['genre[0]']==F1['genre[1]'],"left").drop('genre[1]')\
     .join(F2,F2['genre[2]']==F['genre[0]'],"left").drop('genre[2]')\
     .join(F3,F3['genre[3]']==F['genre[0]'],"left").drop('genre[3]')\
     .join(F4,F4['genre[4]']==F['genre[0]'],"left").drop('genre[4]')\
     .join(F5,F5['genre[5]']==F['genre[0]'],"left").drop('genre[5]').fillna(0)

F = F.select(F[0].alias('Genres'),((F[1]+F[3]+F[5]+F[7]+F[9]+F[11])/(F[2]+F[4]+F[6]+F[8]+F[10]+F[12])).alias('Average_Rating'),(F[2]+F[4]+F[6]+F[8]+F[10]+F[12]).alias('Count'))
F.select(concat_ws(",",col("Genres"),col("Average_Rating"),col("Count")).alias("genre_averagerating_Promedio_reviews")).write.text("3_out")
kmbjn2e3

kmbjn2e31#

有一个更好的办法……好得多的办法

设置

A.show()

+-------+------+------+------------+
|movieId| title|rating|      genres|
+-------+------+------+------------+
|      1|movie1|     6|   [a, b, c]|
|      2|movie2|     2|      [b, c]|
|      3|movie3|     8|         [c]|
|      4|movie4|     6|[a, b, c, d]|
+-------+------+------+------------+

解决方案

关键的一步是explode类型列表,将每个类型复制为单独的一行,然后您可以按genre对 Dataframe 进行分组并进行聚合。

result = (
    A
    .withColumn('genre', F.explode('genre'))
    .groupBy('genre').agg(
        F.count('movieId').alias('count'),
        F.mean('rating').alias('avg_rating'),
    )
)

结果

result.show()

+------+-----+-----------------+
|genre |count|       avg_rating|
+------+-----+-----------------+
|     c|    4|              5.5|
|     b|    3|4.666666666666667|
|     a|    2|              6.0|
|     d|    1|              6.0|
+------+-----+-----------------+

相关问题