免责声明该问题是包括这两个SO问题(q1,q2)的线程的一部分
这些数据类似于ml-latest数据集的ratings.csv file(~ 891 mb)中的电影评级。
有一次我用polars
库读取了csv文件,如下所示:
movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))
字符串
- 假设我们要计算user=1看过的电影(例如62部电影)与数据集中其余电影之间的相似度。仅供参考,数据集有约83,000部电影,因此对于each other_movie(82,938),计算与user 1看过的每部电影(62部电影)的相似度。复杂度为62 x82938(迭代)。*
对于本例,报告的基准测试仅适用于400/82,938 other_movies
为此,我创建了两个polars
框架,其中一个框架包含other_movies
(约82,938行),另一个框架仅包含用户观看的电影(62行)。
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
)
.group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
).group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
型
结果是两个polars
嵌套框,行(电影)和列(用户看过的电影和每个用户的评分)。
x1c 0d1x的数据
第一个框架只包含other_movies
,我们可以向user 1推荐,如果他/她没有看到它们。
第二个框架只包含用户看过的电影。
接下来,我的方法是通过应用一个UDF函数来覆盖第一个框架的每一行。
item_metadata_similarity = (
items_metadata.with_columns(
similarity_score=pl.struct(pl.all()).map_elements(
lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
return_dtype=pl.List(pl.List(pl.Float64)),
strategy="threading"
)
)
)
型
其中item_compute_similarity_scoring_V2
定义为:
def item_compute_similarity_scoring_V2(
row,
target_movies_metadata:pl.DataFrame
):
users_item1 = np.asarray(row["users_seen_movie"])
ratings_item1 = np.asarray(row["user_ratings"])
computed_similarity: list=[]
for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
users_item2=np.asarray(row2["users_seen_movie"])
ratings_item2=np.asarray(row2["user_ratings"])
r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
if r1.shape[0] != 0 and r2.shape[0] != 0:
similarity_score = compute_similarity_score(r1, r2)
if similarity_score > 0.0: #filter out negative or zero similarity scores
computed_similarity.append((row2["movieId"], similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
return most_similar_pairs
型
,item_ratings
& compute_similarity_score
定义为
def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
sr1 = r1[indices1]
sr2 = r2[indices2]
assert len(sr1)==len(sr2), "ratings don't have same lengths"
return sr1, sr2
@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
assert(array1.shape[0] == array2.shape[0])
a1a2 = 0
a1a1 = 0
a2a2 = 0
for i in range(array1.shape[0]):
a1a2 += array1[i]*array2[i]
a1a1 += array1[i]*array1[i]
a2a2 += array2[i]*array2[i]
cos_theta = 1.0
if a1a1!=0 and a2a2!=0:
cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
return cos_theta
型
基本上,该函数迭代第二个框架的每一行,并为每一行计算other_movie
和用户所看电影之间的相似度。因此,对于400部电影,我们进行400*62次迭代,为每个other_movie
生成62个相似度分数。
每次计算的结果都是一个模式为[[1, 0.20], [110, 0.34]]...
的数组(每个other_movie
长度为62对)
的
400部电影的基准
- INFO -项目-项目:在0:05:49.887032中计算的400部电影的相似性分数
1.~2分钟。 - 5GB的RAM使用
我想知道如何通过使用原生polars
命令或利用numba
框架来提高并行性。
更新-第二种方法使用to_numpy()
操作,不使用iter_rows()
和map_elements()
user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies)
)
)
# print(items_metadata.head(5))
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies)
)
)
# print(target_items_metadata.head(5))
型
在第二种方法中,items_metadata
和target_items_metadata
是两个大的极坐标表。
然后,我的下一步是使用to_numpy()
命令将这两个表保存到numpy.ndarrays
中。
items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
mask = items_metadata_array[:, 1] == other_movie
other_movies_chunk = items_metadata_array[mask]
u1 = other_movies_chunk[:,0].astype(np.int32)
r1 = other_movies_chunk[:,2].astype(np.float32)
computed_similarity: list=[]
for i, user_movie in enumerate(user_rated_movies):
print(user_movie)
mask = target_items_metadata_array[:, 1] == user_movie
target_movie_chunk = target_items_metadata_array[mask]
u2 = target_movie_chunk[:,0].astype(np.int32)
r2 = target_movie_chunk[:,2].astype(np.float32)
common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
similarity_score = compute_similarity_score(common_r1, common_r2)
if similarity_score > 0.0:
computed_similarity.append((user_movie, similarity_score))
most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
computed_similarity_scores[str(other_movie)] = most_similar_pairs
第二种方法的基准(8.50分钟>第一种方法的6分钟)
- 项目-项目:在0:08:50.537102中计算400部电影的相似性分数
更新-使用iter_rows()
操作的第三种方法
在我的第三种方法中,我比前两种方法得到了更好的结果,对于用户1和400部电影,我在大约2分钟内得到了结果。
items_metadata = (
movie_ratings.filter(
~pl.col("movieId").is_in(user_rated_movies)
)
.group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
target_items_metadata = (
movie_ratings.filter(
pl.col("movieId").is_in(user_rated_movies)
).group_by("movieId").agg(
users_seen_movie=pl.col("userId").unique(),
user_ratings=pl.col("rating")
)
)
型items_metadata
是用户1未看到的other_movies
的元数据。target_items_metadata
用户1评分的电影的元数据。术语元数据指的是两个聚合的.agg()
列,users_seen_movie
和user_ratings
最后,我使用polars
的iter_rows()
方法创建了两个for循环
def cosine_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
assert(array1.shape[0] == array2.shape[0])
a1a2 = 0
a1a1 = 0
a2a2 = 0
for i in range(array1.shape[0]):
a1a2 += array1[i]*array2[i]
a1a1 += array1[i]*array1[i]
a2a2 += array2[i]*array2[i]
# cos_theta = 1.0
cos_theta = 0.0
if a1a1!=0 and a2a2!=0:
cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
return max(0.0, cos_theta)
for row1 in item_metadata.iter_rows():
computed_similarity: list= []
for row2 in target_items_metadata.iter_rows():
r1, r2 = item_ratings(np.asarray(row1[1]), np.asarray(row1[2]), np.asarray(row2[1]), np.asarray(row2[2]))
if r1.shape[0]!=0 and r2.shape[0]!=0:
similarity_score = cosine_similarity_score(r1, r2)
computed_similarity.append((row2[0], similarity_score if similarity_score > 0 else 0))
computed_similarity_scores[str(row1[0])] = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
型
400部电影的基准
- INFO -项目-项目:在0:01:50中计算的400部电影的相似性分数
1.~2分钟。
1.使用了4.5GB的RAM。
1条答案
按热度按时间njthzxwz1#
我不太熟悉numba,所以在尝试比较计时之前,我要做的第一件事是创建一个“完全原生”的Polars方法:
这是当前方法的直接翻译(即它仍然包含“双for循环”),因此它只是作为基线尝试。
因为它使用了Lazy API,所以循环中的任何东西都不会被计算。
这一切都是在调用
.collect()
时完成的(这允许Polars并行化工作)。similarity_score的
> 0.0
过滤将在收集结果后完成。个字符
测试
.head(100)
我得到58s
运行时相比,111s
运行时为您的例子,内存消耗是相同的。duckdb
作为比较,带有
.head(400)
的duckdb运行在5s
中的字符串