numpy 在Python中加速稀疏交叉差分

ruyhziif  于 11个月前  发布在  Python
关注(0)|答案(1)|浏览(119)

我需要计算两组向量之间的成对距离,但我只需要知道这些成对距离的一小部分,当我的两组向量足够大时,这个比例将小于0.01。
这是我当前的解决方案,其中包含一个小示例。AB是两组向量,M存储我需要保留的向量对。AB中的行数通常要大得多还值得一提的是,M在每行中至少有一个非零值,但某些列可能只包含零。

import numpy as np

A = np.array([[1, 2], [2, 3], [3, 4]])                              # (3, 2)
B = np.array([[4, 5], [5, 6], [6, 7], [7, 8], [8, 9]])              # (5, 2)
M = np.array([[0, 0, 0, 1, 0], [1, 1, 0, 0, 0], [0, 0, 0, 0, 1]])   # (3, 5)

diff = A[:,None] - B[None,:]                                        # (3, 5, 2)
distances = np.linalg.norm(diff, ord=2, axis=2)                     # (3, 5)
masked_distances = distances * M                                    # (3, 5)

字符串
这段代码可以工作,但是它计算了很多我不需要的范数,所以它执行了很多不必要的计算,特别是当AB变得更大的时候。有没有更有效的方法来只计算我需要的范数?我愿意接受其他软件包的建议。
我曾考虑过使用稀疏数组来计算masked_distances,但我在scipy中使用超过2维的稀疏数组(即diff)时遇到了问题。
我也试过创建一个向量化函数,这也是可行的,但是当AB增长到有数千条记录时,速度会更慢。

def conditional_diff(a, b, m):
    if m:
        return a-b
    else:
        return 0

conditional_diff_vec = np.vectorize(conditional_diff, otypes=[float])
masked_diff = conditional_diff_vec(A[:,None], B[None,:], M[:,:,None])
masked_distances = norm(masked_diff, ord=2, axis=2)

lxkprmvk

lxkprmvk1#

我会用numba构造一个Compressed Sparse Row matrix来解决这个问题。CSR矩阵允许我避免使用内存来表示矩阵中的许多零。
Numba允许我使用显式循环而不会损失太多性能。这允许我使用if来避免计算未使用的距离。

import numba as nb
import numpy as np
import scipy
import math

A_big = np.random.rand(2000, 10)
B_big = np.random.rand(4000, 10)
M_big = np.random.rand(A_big.shape[0], B_big.shape[0]) < 0.001

@nb.njit()
def euclidean_distance(vec_a, vec_b):
    acc = 0.0
    for i in range(vec_a.shape[0]):
        acc += (vec_a[i] - vec_b[i]) ** 2
    return math.sqrt(acc)

@nb.njit()
def masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask):
    write_pos = 0
    N, M = matrix_a.shape[0], matrix_b.shape[0]
    for i in range(N):
        for j in range(M):
            if mask[i, j]:
                # Record distance value
                data[write_pos] = euclidean_distance(matrix_a[i], matrix_b[j])
                # Record what column this distance value should be in
                indicies[write_pos] = j
                write_pos += 1
        # Record how many entries are in this row
        indptr[i + 1] = write_pos
    # Assert that we wrote to every part of un-initialized memory
    assert write_pos == data.shape[0]
    assert write_pos == indicies.shape[0]
    # Data is returned by modifying parameters to function

def masked_distance(matrix_a, matrix_b, mask):
    N, M = matrix_a.shape[0], matrix_b.shape[0]
    assert mask.shape == (N, M)
    # Convert mask to bool
    mask = mask != 0
    # How many entries will this sparse array have?
    sparse_length = mask.sum()
    # Set up CSR matrix
    # data and indicies do not need to be zero-initialized,
    # and it is faster not to initialize them
    # Holds data values
    data = np.empty(sparse_length, dtype='float64')
    # Holds column that each data value is in
    indicies = np.empty(sparse_length, dtype='int64')
    # Holds the position of the elements of each row within data and indicies
    indptr = np.zeros(N + 1, dtype='int64')
    masked_distance_inner(data, indicies, indptr, matrix_a, matrix_b, mask)
    return scipy.sparse.csr_matrix((data, indicies, indptr), shape=(N, M))

%timeit masked_distance(A_big, B_big, M_big)

字符串
几点注意事项:

  • 我发现在numba中计算欧氏距离比np.linalg.norm(A[i] - B[i])快,但仍然返回相同的结果,这就是为什么euclidean_distance()函数存在的原因。
  • masked_distance()负责设置算法。masked_distance_inner()是所有速度关键的东西。
  • 我将其与问题中的算法进行了基准测试,发现对于形状为(2000,10)和B(4000,10)的A输入,M有0.1%的元素设置为True,它大约快了40倍。
13.5 ms ± 66.6 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
556 ms ± 3.74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


确切的加速比取决于所涉及的矩阵的大小和掩码的稀疏程度。例如,我对长度为1000的向量进行了基准测试,发现了1000倍的加速比。

  • 我使用np.allclose()检查了这个算法与原始算法的正确性。
  • 你可以通过使用更小的dtypes来加快速度。如果你只需要7位数的距离精度,你可以用float32代替float64。如果你知道矩阵的维数小于231,并且定义的元素永远不会超过231,你可以用int32代替int64

相关问题