我想在数据库中查找每个类型的平均评分和该类型电影的总数。然而,数据库的组织方式如下:columns = [movieId,title,rating,genres],其中genres列是适用于电影的流派列表(最多6项)。我如何转换数据库,以便索引列是没有'for'循环的流派,因为此时我正在访问列表中的每个元素。
以下是我的代码:
import pyspark
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import max,split,avg,count,col,sum,concat_ws
spark = SparkSession.builder.appName("APISpark").getOrCreate()
ratings = spark.read.option("header","true").csv("input/ml25m/ratings.csv").drop("userId","timestamp")
movies = spark.read.option("header","true").csv("input/ml-25m/movies.csv")
movies = movies.withColumn('genre', split(movies['genres'], '\|') ).drop('genres')
A = movies.join(ratings,ratings["movieId"]==movies["movieId"]).drop('movieId')
F = A.groupBy(col("genre")[0]).agg(sum("rating").alias('s0'), count("title").alias('c0'))
F1 = A.groupBy(col("genre")[1]).agg(sum("rating").alias('s1'), count("title").alias('c1'))
F2 = A.groupBy(col("genre")[2]).agg(sum("rating").alias('s2'), count("title").alias('c2'))
F3 = A.groupBy(col("genre")[3]).agg(sum("rating").alias('s3'), count("title").alias('c3'))
F4 = A.groupBy(col("genre")[4]).agg(sum("rating").alias('s4'), count("title").alias('c4'))
F5 = A.groupBy(col("genre")[5]).agg(sum("rating").alias('s5'), count("title").alias('c5'))
F = F.join(F1,F['genre[0]']==F1['genre[1]'],"left").drop('genre[1]')\
.join(F2,F2['genre[2]']==F['genre[0]'],"left").drop('genre[2]')\
.join(F3,F3['genre[3]']==F['genre[0]'],"left").drop('genre[3]')\
.join(F4,F4['genre[4]']==F['genre[0]'],"left").drop('genre[4]')\
.join(F5,F5['genre[5]']==F['genre[0]'],"left").drop('genre[5]').fillna(0)
F = F.select(F[0].alias('Genres'),((F[1]+F[3]+F[5]+F[7]+F[9]+F[11])/(F[2]+F[4]+F[6]+F[8]+F[10]+F[12])).alias('Average_Rating'),(F[2]+F[4]+F[6]+F[8]+F[10]+F[12]).alias('Count'))
F.select(concat_ws(",",col("Genres"),col("Average_Rating"),col("Count")).alias("genre_averagerating_Promedio_reviews")).write.text("3_out")
1条答案
按热度按时间kmbjn2e31#
有一个更好的办法……好得多的办法
设置
解决方案
关键的一步是
explode
类型列表,将每个类型复制为单独的一行,然后您可以按genre
对 Dataframe 进行分组并进行聚合。结果