Numpy中计算量大的数学函数的寻优

imzjd6km  于 2023-08-05  发布在  其他
关注(0)|答案(3)|浏览(106)

我最近一直在开发一个Python脚本,它实现了一个特定的数学函数,如下图所示
x1c 0d1x的数据
其中指数化是周期性的,并且1 <= j <= n。该函数相对复杂,灵感来自以前的question。该代码的主要目的是计算大型数组x(大小为5000或更大)的数学函数。下面是这个函数在Python中的当前实现:

import numpy as np
import sys, os

def compute_y(x, L, v):
    n = len(x)
    y = np.zeros(n)

    for k in range(L+1):
        # Generate the indices for the window around each j, with periodic wrapping
        indices = np.arange(-k, k+1)

        # Compute the weights
        weights_1 = k - np.abs(indices)
        weights_2 = k + 1 - np.abs(indices)
        weights_d = np.ones(2*k+1)

        # For each j, take the elements in the window around j and multiply by the weights
        x_matrix = np.take(x, (np.arange(n)[:, None] + indices) % n, mode='wrap')
        
        exp_1 = np.exp(-np.sum(weights_1[None, :] * x_matrix, axis=1)/v)
        exp_2 = np.exp(-np.sum(weights_2[None, :] * x_matrix, axis=1)/v)
        denom = np.sum(weights_d[None, :] * x_matrix, axis=1)

        # Compute the weighted sum for each j and add to the total
        y += (exp_1 - exp_2)/denom

    return y

# Test the function
x = np.random.rand(5000)
L = len(x)//2
v = 1.4
y = compute_y(x, L, v)

字符串
我目前面临的问题是,这段代码虽然是功能性的和向量化的,但比预期的要慢得多,特别是在应用于大型数组时。我相信这种缓慢的主要来源是for循环,它处理生成索引,计算权重,求和和计算指数。
因此,我正在寻找关于如何加速此代码的指导和建议,特别是对于大小为5000或更大的数组。我特别感兴趣的是通过Numpy利用向量化来加快计算的方法。
如果你能帮忙的话,我将不胜感激。在此先谢谢您!

bihw5rsg

bihw5rsg1#

卷积

我认为,最重要的是要注意,这或多或少是一个卷积:

exp_1 = np.exp(-np.sum(weights_1[None, :] * x_matrix, axis=1)/v)

字符串
如果你看x_matrix的值,它们是x在位置-1,0,1,...的x值。围绕每个x值。然后乘以权重。这是一个卷积。
我们为什么要关心?这很有用,因为有人已经做了fast libraries for doing convolutions
这里的核心思想是用三个卷积代替np.take(),避免创建x_matrix数组。

import scipy.signal

def compute_y_convolution(x, L, v):
    n = len(x)
    y = np.zeros(n)
    x3 = np.tile(x, 3)

    for k in range(L+1):
        # Generate the indices for the window around each j, with periodic wrapping
        indices = np.arange(-k, k+1)

        # Compute the weights
        weights_1 = k - np.abs(indices)
        weights_2 = k + 1 - np.abs(indices)
        weights_d = np.ones(2*k + 1)

        conv = scipy.signal.convolve(x3, weights_d, mode='same')[n:-n]
        exp_1 = np.exp(-(scipy.signal.convolve(x3, weights_1, mode='same')[n:-n])/v)
        exp_2 = np.exp(-(scipy.signal.convolve(x3, weights_2, mode='same')[n:-n])/v)

        # Compute the weighted sum for each j and add to the total
        y += (exp_1 - exp_2)/conv

    return y


(Why在进行卷积之前,这是否创建了x数组的3个副本?在每个数组的边缘,您希望它环绕并访问数组另一端的元素,但scipy.signal.convolve只会将卷积的这些部分视为零。
这工作得很好,在5000个元素的数组上实现了141倍的加速。(718秒vs. 5.06秒)

复用卷积

卷积是相当昂贵的,我们最终在前面的例子中每个循环都要做三次卷积。我们能做得更好吗?
让我们打印出卷积每个循环使用的权重:

k=0
weights_1=array([0])
weights_2=array([1])
weights_d=array([1.])
k=1
weights_1=array([0, 1, 0])
weights_2=array([1, 2, 1])
weights_d=array([1., 1., 1.])


我们可以注意到三件事:

  • 分母中的权重都是1,这是均匀的滤波器响应。Scipy有一个specialized function,对于均匀过滤器来说更快。
  • weights_2的值等于weights_1的值加上均匀滤波器。
  • weights_1的值相当于上一次循环中weights_2的值。

利用这些观察,我们可以从3到1个卷积。

def compute_y_reuse(x, L, v):
    n = len(x)
    y = np.zeros(n)

    last_exp_2_raw = np.zeros(n)
    for k in range(L+1):
        uniform = scipy.ndimage.uniform_filter(x, size=2*k + 1, mode='wrap') * (2*k + 1)
        exp_1_raw = last_exp_2_raw
        exp_1 = np.exp(-exp_1_raw/v)
        exp_2_raw = exp_1_raw + uniform
        exp_2 = np.exp(-exp_2_raw/v)

        # Compute the weighted sum for each j and add to the total
        y += (exp_1 - exp_2)/uniform
        
        last_exp_2_raw = exp_2_raw

    return y


这在5000个元素的阵列上实现了1550倍的加速比。(718秒vs. 0.462秒)

删除最后一个卷积

我进一步研究了这个问题,并试图删除最后一个卷积。本质上,这个想法是,在前一个循环中,我们计算了N个最接近的元素的总和,而下一个循环,我们计算了N+2个最接近的元素的总和,所以我们可以把最边上的2个元素相加。
我尝试使用np.roll()来完成这个任务,但发现它比uniform_filter()慢,因为它必须复制数组。最终我找到了this thread,它让我找到了解决这个问题的方法。
此外,由于exp_1_raw与上一次迭代的exp_2_raw相同,因此我们可以通过保存该次迭代的输出来重用np.exp()调用。

def fast_roll_add(dst, src, shift):
    dst[shift:] += src[:-shift]
    dst[:shift] += src[-shift:]

def compute_y_noconv(x, L, v):
    n = len(x)
    y = np.zeros(n)

    last_exp_2_raw = np.zeros(n)
    last_exp_2 = np.ones(n)
    uniform = x.copy()
    for k in range(L+1):
        if k != 0:
            fast_roll_add(uniform, x, k)
            fast_roll_add(uniform, x, -k)
        exp_1_raw = last_exp_2_raw
        exp_1 = last_exp_2
        exp_2_raw = exp_1_raw + uniform / v
        exp_2 = np.exp(-exp_2_raw)

        # Compute the weighted sum for each j and add to the total
        y += (exp_1 - exp_2) / uniform
        
        last_exp_2_raw = exp_2_raw
        last_exp_2 = exp_2
    return y


这在5000个元素的阵列上实现了3100倍的加速比。(718秒vs. 0.225秒)它也不再需要scipy作为依赖项。

dm7nw8vv

dm7nw8vv2#

我的解决方案比@Nick ODell的慢得多,但它仍然比原始代码快,而且简单易读。
我几乎只是走了一条天真的路线,使用嵌套循环编写函数,然后使用numba来加速它。更新:感谢@Nin17建议我在compute_y上使用njit(parellel=True),并使用nb.prange而不是range

import numpy as np
import numba as nb

@nb.njit
def compute_yj(j, x, L, invv, n):
    res = 0
    for k in range(L+1):
        t1 = 0
        t2 = 0
        d = 0
        for i in range(-k, k+1):
            xijn = x[(j+i)%n]
            t1 += (k - np.abs(i))*xijn*invv
            t2 += (k + 1 - np.abs(i))*xijn*invv
            d += xijn
        res += (np.exp(-t1) - np.exp(-t2))/d
        
    return res

@nb.njit(parallel=True)
def compute_y(x, L, v):
    n = x.shape[0]
    y = np.zeros(n)
    invv = 1/v
        
    for j in nb.prange(n):
        y[j] = compute_yj(j, x, L, invv, n)
        
    return y

字符串
测试:

# run a lightweight call for the compilation
x = np.random.rand(2)
L = len(x)//2
v = 1.4
compute_y2(x, L, v)

# actual call
x = np.random.rand(5000)
L = len(x)//2
y = compute_y(x, L, v)


对于1000个值,它的运行时间约为1.5秒,对于5000个值,它的运行时间约为43秒(相比之下,原始代码的运行时间约为8秒和800秒)。这些时间包括small(大小为2)的编译时间。

50few1ms

50few1ms3#

在我的系统(一台小型笔记本电脑)上,将rand()数组大小从5000更改为1000的原始代码在9秒内运行。将其更改为使用多处理将其减少到5秒。我敢打赌你的机器有更多的核心,所以会更快。
代码变更为:

def compute_y(x, L, v, k):
    # everything as before except no loop, k is already here

def run_one(k):
    return compute_y(x, L, v, k)

if __name__ == '__main__':
    import concurrent.futures
    pool = concurrent.futures.ProcessPoolExecutor()
    y = sum(pool.map(run_one, range(L+1), chunksize=10))

字符串
值得一提的是,np.take()行似乎消耗了大量时间。如果你想进一步优化,我会尝试攻击它。

相关问题