如何在Python(NumPy)中提高两个大型批处理数组的交叉二进积的性能?

z4bn682m  于 2023-08-05  发布在  Python
关注(0)|答案(2)|浏览(96)

我想在Python(NumPy)中计算两个二阶Tensor的次对称和主对称交叉并矢积。

C[i,j,k,l] = (
    A[i, k] * B[j, l] + A[i, l] * B[k, j] + B[i, k] * A[j, l] + B[i, l] * A[k, j]
) / 4

字符串
我有两个形状为(3, 3, n)的二阶Tensor,其中最后一个拖尾轴表示批次维度。我已经在我最近发布的Python包hyperelastic中实现了这个函数,并希望得到您对我的代码效率的反馈。我想避免任何JIT之类的东西。
给定两个对称二阶Tensor作为减少向量存储中的NumPy-Arrays,

import numpy as np

# any random 3x3 tensor
F = np.eye(3).reshape(3, 3, 1) + np.random.rand(3, 3, 100000) / 10

# a symmetric 3x3 tensor
C = F.T @ F

# Voigt-notation vector storage 
# (upper triangle items but sorted on the diagonals; starting from the main diag.)
i = [0, 1, 2, 0, 1, 0]
j = [0, 1, 2, 1, 2, 2]
A = C[i, j]


我当前的实现看起来像这样:

import numpy as np

def cdya(A, B):
    i, j = [a.ravel() for a in np.indices((6, 6))]

    a = np.array([(0, 0), (1, 1), (2, 2), (0, 1), (1, 2), (0, 2)])
    b = np.array([0, 3, 5, 3, 1, 4, 5, 4, 2]).reshape(3, 3)

    i, j, k, l = np.hstack([a[i], a[j]]).T

    ik = b[i, k].reshape(6, 6)
    jl = b[j, l].reshape(6, 6)

    il = b[i, l].reshape(6, 6)
    kj = b[k, j].reshape(6, 6)

    C = (A[ik] * B[jl] + A[il] * B[kj]) / 2

    if A is not B:
        C += (B[ik] * A[jl] + B[il] * A[kj]) / 2
        C /= 2

    return C

A = np.random.rand(6, 100000)
B = np.random.rand(6, 100000)

C = cdya(A, B)


在我的机器上,这个功能需要大约。100 ms。在全Tensor存储中使用np.einsum的等效实现需要大约100 ms。160毫秒。

A = np.random.rand(3, 3, 100000)
B = np.random.rand(3, 3, 100000)

C = (
    np.einsum("ij...,kl...->ikjl...", A, B) + np.einsum("ij...,kl...->ilkj...", A, B) +
    np.einsum("ij...,kl...->ikjl...", B, A) + np.einsum("ij...,kl...->ilkj...", B, A)
) / 4


你认为有改进的空间吗?或者我已经找到了一个很好的解决方案?
非常感谢提前!

tpxzln5u

tpxzln5u1#

Numpy的问题是,默认情况下创建了许多相对较大的临时数组(为了方便起见)。这是昂贵的。你可以做就地操作,尽管这很麻烦。此外,你也可以因式分解最后的乘法是否满足条件:
下面是一个例子:

def cdya_opt(A, B):
    i, j = [a.ravel() for a in np.indices((6, 6))]

    a = np.array([(0, 0), (1, 1), (2, 2), (0, 1), (1, 2), (0, 2)])
    b = np.array([0, 3, 5, 3, 1, 4, 5, 4, 2]).reshape(3, 3)

    i, j, k, l = np.hstack([a[i], a[j]]).T

    ik = b[i, k].reshape(6, 6)
    jl = b[j, l].reshape(6, 6)

    il = b[i, l].reshape(6, 6)
    kj = b[k, j].reshape(6, 6)

    x1, x2, x3, x4 = A[ik], B[jl], A[il], B[kj]
    np.multiply(x1, x2, out=x1)
    np.multiply(x3, x4, out=x3)
    np.add(x1, x3, out=x1)

    if A is not B:
        x5, x6, x7, x8 = B[ik], A[jl], B[il], A[kj]
        np.multiply(x5, x6, out=x5)
        np.multiply(x7, x8, out=x7)
        np.add(x5, x7, out=x5)
        np.add(x5, x1, out=x1)
        np.multiply(x1, 0.25, out=x1)
    else:
        np.multiply(x1, 0.5, out=x1)

    return x1

字符串
如果使用其他包,如**numexpr**对你来说很好,那么你可以编写一个更快的实现:

import numexpr as ne

def cdya_opt2(A, B):
    i, j = [a.ravel() for a in np.indices((6, 6))]

    a = np.array([(0, 0), (1, 1), (2, 2), (0, 1), (1, 2), (0, 2)])
    b = np.array([0, 3, 5, 3, 1, 4, 5, 4, 2]).reshape(3, 3)

    i, j, k, l = np.hstack([a[i], a[j]]).T

    ik = b[i, k].reshape(6, 6)
    jl = b[j, l].reshape(6, 6)

    il = b[i, l].reshape(6, 6)
    kj = b[k, j].reshape(6, 6)

    x1, x2, x3, x4 = A[ik], B[jl], A[il], B[kj]

    if A is not B:
        x5, x6, x7, x8 = B[ik], A[jl], B[il], A[kj]
        ne.evaluate('(x1 * x2 + x3 * x4 + x5 * x6 + x7 * x8) * 0.25', out=x1)
    else:
        ne.evaluate('(x1 * x2 + x3 * x4) * 0.5', out=x1)

    return x1

性能结果

以下是我的机器上的结果(i5- 9600 KF CPU,在Windows上运行,使用Numpy 1.24.3):

cdya:       112.0 ms
cdya_opt:    70.7 ms
cdya_opt2:   59.1 ms  <-----


最后一个版本几乎快了两倍。大部分时间都花在像A[ik]这样的操作上,这些操作在Numpy中没有得到很好的优化。使用Cython(或Numba)是使此代码更快的好方法。请注意,Cython不是JIT,而不是Numba。

cgyqldqp

cgyqldqp2#

我的最终解决方案是在可读性和速度之间进行权衡。现在我也利用结果的对称性。该功能需要~ 40 ms才能完成,这太棒了。

def cdya_final(A, B):
    i, j = [a.ravel() for a in np.triu_indices(6)]

    a = np.array([(0, 0), (1, 1), (2, 2), (0, 1), (1, 2), (0, 2)])
    b = np.array([0, 3, 5, 3, 1, 4, 5, 4, 2]).reshape(3, 3)

    i, j, k, l = np.hstack([a[i], a[j]]).T

    ik = b[i, k]
    jl = b[j, l]

    il = b[i, l]
    kj = b[k, j]
    
    ij = b[i, j]
    kl = b[k, l]
    
    C = np.zeros((6, *np.broadcast_shapes(A.shape, B.shape)))
    
    A, B = np.broadcast_arrays(A, B)
    Aik, Bjl, Ail, Bkj = A[ik], B[jl], A[il], B[kj]
    
    values = np.multiply(Aik, Bjl, out=Aik) + np.multiply(Ail, Bkj, out=Ail)

    if A is not B:
        Bik, Ajl, Bil, Akj = B[ik], A[jl], B[il], A[kj]
        
        values += np.multiply(Bik, Ajl, out=Bik) + np.multiply(Bil, Akj, out=Bil)
        values /= 4
    else:
        values /= 2
    
    C[ij, kl] = C[kl, ij] = values

    return C

字符串
谢谢您的帮助!

相关问题