使用numba、polars和numpy高效计算项目协作过滤相似性

tvz2xvvm  于 11个月前  发布在  其他
关注(0)|答案(1)|浏览(76)

免责声明该问题是包括这两个SO问题(q1q2)的线程的一部分

这些数据类似于ml-latest数据集的ratings.csv file(~ 891 mb)中的电影评级。
有一次我用polars库读取了csv文件,如下所示:

movie_ratings = pl.read_csv(os.path.join(application_path + data_directory, "ratings.csv"))

字符串

  • 假设我们要计算user=1看过的电影(例如62部电影)与数据集中其余电影之间的相似度。仅供参考,数据集有约83,000部电影,因此对于each other_movie(82,938),计算与user 1看过的每部电影(62部电影)的相似度。复杂度为62 x82938(迭代)。*

对于本例,报告的基准测试仅适用于400/82,938 other_movies
为此,我创建了两个polars框架,其中一个框架包含other_movies(约82,938行),另一个框架仅包含用户观看的电影(62行)。

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1 (data related to user 1)
user_rated_movies = list(user_ratings.select(pl.col("movieId")).to_numpy().ravel()) #movies seen by user1
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies) #& pl.col("movieId").is_in(potential_movie_recommendations[:total_unseen_movies])
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)


结果是两个polars嵌套框,行(电影)和列(用户看过的电影和每个用户的评分)。
x1c 0d1x的数据
第一个框架只包含other_movies,我们可以向user 1推荐,如果他/她没有看到它们。
第二个框架只包含用户看过的电影。
接下来,我的方法是通过应用一个UDF函数来覆盖第一个框架的每一行。

item_metadata_similarity = (
    items_metadata.with_columns(
        similarity_score=pl.struct(pl.all()).map_elements(
            lambda row: item_compute_similarity_scoring_V2(row, similarity_metric, target_items_metadata),
            return_dtype=pl.List(pl.List(pl.Float64)),
            strategy="threading"
        )
    )
)


其中item_compute_similarity_scoring_V2定义为:

def item_compute_similarity_scoring_V2(
    row,
    target_movies_metadata:pl.DataFrame
):
    users_item1 = np.asarray(row["users_seen_movie"])
    ratings_item1 = np.asarray(row["user_ratings"])
    computed_similarity: list=[]
    for row2 in target_movies_metadata.iter_rows(named=True): #iter over each row from the second dataframe with the movies seen by the user.
        users_item2=np.asarray(row2["users_seen_movie"])
        ratings_item2=np.asarray(row2["user_ratings"])
        r1, r2 = item_ratings(users_item1, ratings_item1, users_item2, ratings_item2)
        if r1.shape[0] != 0 and r2.shape[0] != 0:
            similarity_score = compute_similarity_score(r1, r2)
            if similarity_score > 0.0: #filter out negative or zero similarity scores
                computed_similarity.append((row2["movieId"], similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)
    return most_similar_pairs


item_ratings & compute_similarity_score定义为

def item_ratings(u1:np.ndarray, r1:np.ndarray, u2:np.ndarray, r2:np.ndarray) -> (np.ndarray, np.ndarray):
    common_elements, indices1, indices2 = np.intersect1d(u1, u2, return_indices=True)
    sr1 = r1[indices1]
    sr2 = r2[indices2]
    assert len(sr1)==len(sr2), "ratings don't have same lengths"
    return sr1, sr2

@jit(nopython=True, parallel=True)
def compute_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    cos_theta = 1.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return cos_theta


基本上,该函数迭代第二个框架的每一行,并为每一行计算other_movie和用户所看电影之间的相似度。因此,对于400部电影,我们进行400*62次迭代,为每个other_movie生成62个相似度分数。
每次计算的结果都是一个模式为[[1, 0.20], [110, 0.34]]...的数组(每个other_movie长度为62对)



400部电影的基准

  1. INFO -项目-项目:在0:05:49.887032中计算的400部电影的相似性分数
    1.~2分钟。
  2. 5GB的RAM使用
    我想知道如何通过使用原生polars命令或利用numba框架来提高并行性。

更新-第二种方法使用to_numpy()操作,不使用iter_rows()map_elements()

user_ratings = movie_ratings.filter(pl.col("userId")==input_id) #input_id = 1
user_rated_movies = user_ratings.select(pl.col("movieId")).to_numpy().ravel()
potential_movies_to_recommend = list(
    movie_ratings.select("movieId").filter( ~(pl.col("movieId").is_in(user_rated_movies)) ).unique().sort("movieId").to_numpy().ravel()
)
items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(items_metadata.head(5))
target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    )
)
# print(target_items_metadata.head(5))


在第二种方法中,items_metadatatarget_items_metadata是两个大的极坐标表。
然后,我的下一步是使用to_numpy()命令将这两个表保存到numpy.ndarrays中。

items_metadata_array = items_metadata.to_numpy()
target_items_metadata_array = target_items_metadata.to_numpy()
computed_similarity_scores:dict = {}
for i, other_movie in enumerate(potential_movies_to_recommend[:400]): #take the first 400 unseen movies by user 1
    mask = items_metadata_array[:, 1] == other_movie
    other_movies_chunk = items_metadata_array[mask]
    u1 = other_movies_chunk[:,0].astype(np.int32)
    r1 = other_movies_chunk[:,2].astype(np.float32)
    computed_similarity: list=[]
    for i, user_movie in enumerate(user_rated_movies):
        print(user_movie)
        mask = target_items_metadata_array[:, 1] == user_movie
        target_movie_chunk = target_items_metadata_array[mask]
        u2 = target_movie_chunk[:,0].astype(np.int32)
        r2 = target_movie_chunk[:,2].astype(np.float32)
        common_r1, common_r2 = item_ratings(u1, r1, u2, r2)
        if common_r1.shape[0] != 0 and common_r2.shape[0] != 0:
            similarity_score = compute_similarity_score(common_r1, common_r2)
            if similarity_score > 0.0:
                computed_similarity.append((user_movie, similarity_score))
    most_similar_pairs = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]
    computed_similarity_scores[str(other_movie)] = most_similar_pairs

第二种方法的基准(8.50分钟>第一种方法的6分钟)

  • 项目-项目:在0:08:50.537102中计算400部电影的相似性分数
更新-使用iter_rows()操作的第三种方法

在我的第三种方法中,我比前两种方法得到了更好的结果,对于用户1和400部电影,我在大约2分钟内得到了结果。

items_metadata = (
    movie_ratings.filter(
        ~pl.col("movieId").is_in(user_rated_movies)
    )
    .group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)

target_items_metadata = (
    movie_ratings.filter(
        pl.col("movieId").is_in(user_rated_movies)
    ).group_by("movieId").agg(
        users_seen_movie=pl.col("userId").unique(),
        user_ratings=pl.col("rating")
    )
)


items_metadata是用户1未看到的other_movies的元数据。
target_items_metadata用户1评分的电影的元数据。术语元数据指的是两个聚合的.agg()列,users_seen_movieuser_ratings
最后,我使用polarsiter_rows()方法创建了两个for循环

def cosine_similarity_score(array1:np.ndarray, array2:np.ndarray) -> float:
    assert(array1.shape[0] == array2.shape[0])
    a1a2 = 0
    a1a1 = 0
    a2a2 = 0
    for i in range(array1.shape[0]):
        a1a2 += array1[i]*array2[i]
        a1a1 += array1[i]*array1[i]
        a2a2 += array2[i]*array2[i]
    # cos_theta = 1.0
    cos_theta = 0.0
    if a1a1!=0 and a2a2!=0:
        cos_theta = float(a1a2/np.sqrt(a1a1*a2a2))
    return max(0.0, cos_theta)

for row1 in item_metadata.iter_rows():
    computed_similarity: list= []
    for row2 in target_items_metadata.iter_rows():
        r1, r2 = item_ratings(np.asarray(row1[1]), np.asarray(row1[2]), np.asarray(row2[1]), np.asarray(row2[2]))
        if r1.shape[0]!=0 and r2.shape[0]!=0:
            similarity_score = cosine_similarity_score(r1, r2)
        computed_similarity.append((row2[0], similarity_score if similarity_score > 0 else 0))
    computed_similarity_scores[str(row1[0])] = sorted(computed_similarity, key=lambda x: x[1], reverse=True)[:k_similar_user]


400部电影的基准

  1. INFO -项目-项目:在0:01:50中计算的400部电影的相似性分数
    1.~2分钟。
    1.使用了4.5GB的RAM。
njthzxwz

njthzxwz1#

我不太熟悉numba,所以在尝试比较计时之前,我要做的第一件事是创建一个“完全原生”的Polars方法:
这是当前方法的直接翻译(即它仍然包含“双for循环”),因此它只是作为基线尝试。
因为它使用了Lazy API,所以循环中的任何东西都不会被计算。
这一切都是在调用.collect()时完成的(这允许Polars并行化工作)。
similarity_score的> 0.0过滤将在收集结果后完成。

input_id = 1

is_user_rating = pl.col("userId") == input_id

can_recommend = (
    pl.col("movieId").is_in(pl.col("movieId").filter(is_user_rating)).not_()
)

cosine_similarity = (
    pl.col('rating').dot('rating_right') /  
    ( pl.col('rating').pow(2).sum().sqrt() * 
      pl.col('rating_right').pow(2).sum().sqrt() ) 
)

user_rated_movies = movie_ratings.filter(is_user_rating).select("movieId").to_series()

potential_movies_to_recommend = (
    movie_ratings.filter(can_recommend).select(pl.col("movieId").unique().sort())
)

# use the Lazy API so we can compute in parallel
df = movie_ratings.lazy()

computed_similarity_scores = []
for other_movie in potential_movies_to_recommend.head(1).to_series(): # .head(N) potential movies
    for user_movie in user_rated_movies:
        score = (
            df.filter(pl.col("movieId") == user_movie)
              .join(
                 df.filter(pl.col("movieId") == other_movie),
                 on = "userId"
              )
              .select(cosine = cosine_similarity)
              .select(user_movie=user_movie, other_movie=other_movie, similarity_score="cosine")
        )
        computed_similarity_scores.append(score)
        
# All scores are computed in parallel
computed_similarity_scores_polars = pl.concat(computed_similarity_scores).collect()

个字符
测试.head(100)我得到58s运行时相比,111s运行时为您的例子,内存消耗是相同的。

duckdb

作为比较,带有.head(400)的duckdb运行在5s

import duckdb

df = duckdb.sql("""
with 
   df     as (from 'imdb.parquet'),
   user   as (from df where movieId in (from df select movieId where userId = 1)),
   movies as (from df where movieId not in (from df select movieId where userId = 1)),
   other  as (from df where movieId in (from movies select distinct movieId order by movieId limit 400))
   
from
   user join other using (userId)
   
select   
   user.movieId user_movie,
   other.movieId other_movie,
   list_cosine_similarity(
      list(user.rating), list(other.rating)
   ) similarity_score
   
group by 
   user_movie, other_movie   
order by 
   user_movie, other_movie
""").pl()
shape: (24_764, 3)
┌────────────┬─────────────┬──────────────────┐
│ user_movie ┆ other_movie ┆ similarity_score │
│ ---        ┆ ---         ┆ ---              │
│ i64        ┆ i64         ┆ f64              │
╞════════════╪═════════════╪══════════════════╡
│ 1          ┆ 2           ┆ 0.95669          │
│ 1          ┆ 3           ┆ 0.941348         │
│ 1          ┆ 4           ┆ 0.92169          │
│ 1          ┆ 5           ┆ 0.943999         │
│ …          ┆ …           ┆ …                │
│ 54259      ┆ 407         ┆ 0.941241         │
│ 54259      ┆ 408         ┆ 0.934745         │
│ 54259      ┆ 409         ┆ 0.937361         │
│ 54259      ┆ 410         ┆ 0.94937          │
└────────────┴─────────────┴──────────────────┘
Elapsed time: 5.02638 seconds

的字符串

相关问题