numpy 将np.einsum翻译为更有性能的内容

aiazj4mn  于 2022-11-10  发布在  其他
关注(0)|答案(2)|浏览(166)

使用python/numpy,我拥有以下np.einsum

np.einsum('abde,abc->bcde', X, Y)

Y是稀疏的:对于每个[a,b],只有一个c == 1;所有其他:=0。以轴的相对大小为例,X.shape约为(1000, 5, 30, 30),而Y.shape等效于(1000, 5, 300)
这个手术非常昂贵;我想让它更有效率。首先,Einsum不是并行化的。另一方面,因为Y是稀疏的,所以我实际上计算的乘法运算是我应该执行的乘法运算的300倍。事实上,当我在n上使用循环编写等同于这个einsum的代码时,我得到了大约3倍的加速。但这显然不是很好。
我应该如何让这件事更具表现力呢?我试过使用np.tensordot,但我不知道如何从中获得我想要的(我仍然遇到了稀疏/密集的问题)。

roejwanj

roejwanj1#

使用Numba可以很容易地做到这一点:

import numba

@numba.njit('float64[:,:,:,::1](float64[:,:,:,::1], float64[:,:,::1])', fastmath=True, parallel=True)
def compute(x, y):
    na, nb, nd, ne = x.shape
    nc = y.shape[2]
    assert y.shape == (na, nb, nc)

    out = np.zeros((nb, nc, nd, ne))

    for b in numba.prange(nb):
        for a in range(na):
            for c in range(nc):
                yVal = y[a, b, c]
                if np.abs(yVal) != 0:
                    for d in range(nd):
                        for e in range(ne):
                            out[b, c, d, e] += x[a, b, d, e] * yVal

    return out

请注意,对于顺序代码,先迭代a,然后迭代b会更快。也就是说,为了使代码是并行的,已经交换了循环,并在b(这是一个小轴)上执行并行化。在轴a上并行缩减会更有效,但不幸的是,使用Numba不容易做到这一点(需要将矩阵拆分成多个块,因为没有创建线程本地矩阵的简单方法)。
注意:您可以用实际值(即30),以便编译器专门针对此矩阵大小生成更快的代码。
以下是测试代码:

np.random.seed(0)
x = np.random.rand(1000, 5, 30, 30)
y = np.random.rand(1000, 5, 300)
y[np.random.rand(*y.shape) > 0.1] = 0.0 # Make it sparse (90% of 0)

%time res = np.einsum('abde,abc->bcde', x, y)  # 2.350 s
%time res2 = compute(x, y)                     # 0.074 s (0.061 s with hand-written sizes)
print(np.allclose(res, res2))

在10核英特尔Skylake Xeon处理器上,这大约要快32倍。与手写尺寸相比,速度提高了38倍。由于b轴上的并行化,它不能很好地扩展,但使用其他轴将导致较低的内存访问效率。
如果这还不够,最好先转置xy,以改善数据局部性(这要归功于沿a轴的更连续的访问模式)和更好的伸缩性(通过并行化bc轴)。尽管如此,换位通常是昂贵的,所以人们当然需要优化它,以获得更好的速度。

kiz8lqtg

kiz8lqtg2#

如果Y只包含1和0,则Einsum基本上是这样做的:

result = np.zeros(Y.shape[1:] + X.shape[2:], X.dtype)
I, J, K = np.nonzero(Y)
result[J, K] += X[I, J]

但这并不能给出正确的结果,因为有重复的j,k指数。我无法让numpy.add.at运行,但仅对这些索引进行循环仍然是相当快的,至少对于给定的形状和稀疏性是如此。

result = np.zeros(Y.shape[1:] + X.shape[2:], X.dtype)
for i, j, k in zip(*np.nonzero(Y)):
    result[j, k] += X[i, j]

这是我使用的测试代码:

a, b, c, d, e = 1000, 5, 300, 30, 30
X = np.random.randint(10, size=(a,b,d,e))
R = np.random.rand(a, b, c)
K = np.argmax(R, axis=2)
I, J = np.indices((a, b), sparse=True)
Y = np.zeros((a, b, c), int)
Y[I, J, K] = 1

相关问题