提前结束Numpy计算

bttbmeg0  于 2023-10-19  发布在  其他
关注(0)|答案(2)|浏览(96)

给定两个大数组,

A = np.random.randint(10,size=(10000,2))
B = np.random.randint(10,size=(10000,2))

我想确定是否有向量的叉积为零。我们可以做

C = np.cross(A[:,None,:],B[None,:,:])

然后检查C是否包含0。

not C.all()

然而,该过程需要计算所有叉积,这可能是耗时的。相反,我更愿意让numpy执行叉积,但如果在任何点达到零,那么只需削减整个操作并提前结束。numpy是否有这样一个“提前终止”操作,如果numpy操作达到某个条件,它会提前终止?比如说

np.allfunc()
np.anyfunc()

上面的例子是这样一种情况,其中A和B在某个点具有零叉积的可能性非常高(实际上很可能发生在开始附近),以至于执行python-for-loop(yuck!)比使用numpy高度优化的代码快得多。
一般来说,确定A和B是否有零叉积的最快方法是什么?

jchrr9hc

jchrr9hc1#

numpy是否有这样一个“提前终止”操作,如果numpy操作达到某个条件,它会提前终止?

**简单地说,不,Numpy没有。**实际上,这比那要复杂一点。

实际上,您提出的allfuncanyfunc函数类似于np.allnp.any。这样的功能会提前终止。下面是我的机器上的一个证明:

v = np.random.randint(0, 2, 200*1024*1024) > 0

# 2.74 ms ± 61.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit -n 10 v.all()

# 3.16 ms ± 46.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
v[0:v.size//16] = True
%timeit -n 10 v.all()

# 3.72 ms ± 77.7 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
v[0:v.size//8] = True
%timeit -n 10 v.all()

# 4.64 ms ± 80.4 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
v[0:v.size//4] = True
%timeit -n 10 v.all()

# 6.67 ms ± 127 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
v[0:v.size//2] = True
%timeit -n 10 v.all()

# 10.5 ms ± 69.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
v[0:v.size] = True
%timeit -n 10 v.all()

前者是如此之快,它不可能覆盖整个阵列,因为内存吞吐量将达到71 GiB/s,而我的RAM只能达到理论上的最大带宽47.7 GiB/s(实际上约为40 GiB/s)。我的CPU缓存远远不足以存储这个数组的重要部分,而这个数组占用了200 MiB的RAM,因此每个布尔值占用1个字节的RAM。奇怪的是,在第一种情况下,allany的速度非常令人失望,因为v[2]实际上是False。Numpy当然会逐块执行计算,并仅在计算完一个块后执行提前终止(如果需要)。
anyall的问题是它们需要一个布尔数组in参数。因此,布尔数组需要首先完全计算,这就破坏了任何提前终止的好处
或者,Numpy提供了ufuncs,旨在以逐个元素的方式对数组进行操作。ufunc的一个例子是np.ufunc.reduce。它们可以被用来“组成算法”,这样理论上提前终止是可能的。这种解决方案比通常的非ufunc操作要快得多,因为可以提前终止。然而,当终止只能在最近才可能时,ufunc将比非ufunc操作慢得多。事实上,与非ufunc操作相比,ufunc(目前)从根本上是低效的。这是因为ufunc需要对每个项目进行昂贵的(本机)* 函数调用 *。此函数调用应用于标量项,可防止几个关键优化,如使用 *SIMD指令 *。尽管如此,ufuncs通常比逐个元素使用纯Python代码更快,这要归功于“向量化”。在您的用例中,当前可以组合ufuncs的方式是AFAIK,不足以编写受益于提前终止的高效代码。
在Numpy中进行提前终止的标准方法是将计算拆分为许多(手动)。这种解决方案通常很快,因为它是矢量化的,通常是SIMD友好的,并且缓存效率高(假设块足够小)。只要块足够大,Numpy的开销就很小。下面是一个示例:

def compute_by_chunk(A, B):
    chunkSize = 256
    for i in range(0, A.size, chunkSize):
        for j in range(0, B.size, chunkSize):
            C = np.cross(A[i:i+chunkSize,None,:], B[None,j:j+chunkSize,:])
            if not C.all():
                # print(f'Zero found in chunk ({i},{j})')
                return True
    return False

对于你的输入示例,在我的机器上需要239 µs,而天真的Numpy实现需要234 ms。在这种情况下,这大约快1000倍。对于更小的块,如32 x32,计算速度甚至更快:32.3微秒
实际上,当有很多零时,较小的块更好(因为计算第一个块需要时间)。256 x256在大多数机器上应该相对较好(缓存友好,内存效率高,Numpy开销低)。
一个更简单的解决方案(通常也更快)是使用模块/工具,如Cython或Numba,编译您的代码,因此由于编译到本机函数,循环可以更快。但是,使用此解决方案,您需要自己重新实现叉积。事实上,Numba还不支持它,如果你只是在循环中调用Numpy函数,Cython也不会更快(比使用Numpy的纯Python代码更快)。

btxsgosb

btxsgosb2#

这个问题已经接受了答案,但我想在这里张贴我的答案,为那些谁谷歌零交叉产品和降落在这个问题上。
我建议用更数学的方法。
a0*b1 = a1*b0时,二维向量ab的叉积为零。这可以转换为a0/a1 = b0/b1。因此,可以如下实现。

np.any(np.isin(A[..., 0] / A[..., 1], B[..., 0] / B[..., 1]))

为了使这一点起作用,我们必须首先处理零除法问题。

  • 对于n/0的情况,我们可以用np.inf来填充它。注意,在numpy中,np.inf == np. inf。
  • 对于0/0,我们可以立即返回True。从np.cross([(0,0)], [(n,n)]) == [0]开始。
import numpy as np

def has_zero_vector(a):
    """Return True if a contains (0, 0)."""
    return not np.all(np.logical_or(a[..., 0], a[..., 1]))

def safe_divide(a, b):
    """Compute a / b. Where b is 0, fill in np.inf."""
    out = np.full(len(a), dtype=np.float64, fill_value=np.inf)
    np.divide(a, b, out=out, where=b != 0)
    return out

def has_zero_cross_product(a, b):
    # Since 0/0 is messy, returns True immediately if there is one.
    if has_zero_vector(a) or has_zero_vector(b):
        return True

    ar = safe_divide(a[..., 0], a[..., 1])
    br = safe_divide(b[..., 0], b[..., 1])
    return np.any(np.isin(ar, br))

A = np.random.randint(10,size=(10000,2))
B = np.random.randint(10,size=(10000,2))
has_zero_cross_product(A, B)

它不尝试提前终止(除了0/0),但比OP的代码快20000倍,比Jérôme的分块计算快20倍。它之所以这么快,主要是因为你的输入是不变的,有很多重复。这使得计算np.isin变得容易。
请注意,此函数也与分块计算兼容。然而,由于计算量太小,开销可能大于提前终止的好处,并且可能几乎没有区别。
从另一个Angular 来看,(不确定这是否有助于OP,但是)即使零叉积的概率很低,这个函数仍然有效。
例如,给定以下输入,零叉积存在的概率为50-50。

np.random.seed(0)
A = np.random.randint(0, 40000, size=(10000, 2))
B = np.random.randint(0, 40000, size=(10000, 2))
has_zero_cross_product(A, B)
# False

np.random.seed(1)
A = np.random.randint(0, 40000, size=(10000, 2))
B = np.random.randint(0, 40000, size=(10000, 2))
has_zero_cross_product(A, B)
# True

即使在这种情况下,它仍然比OP的代码快150倍,比Jérôme的分块计算快100倍以上。请注意,在这种情况下,使这个函数分块计算只会使它更慢。
作为最后的补充,如果允许使用numba,可以进行以下优化:

import numba

@numba.njit(cache=True)
def has_intersect_sorted(a, b):
    """Determine if a and b have a common element."""
    # Both a and b must be sorted.
    # assert np.all(a[1:] >= a[:-1])
    # assert np.all(b[1:] >= b[:-1])
    i = 0
    j = 0
    while i < len(a) and j < len(b):
        # By modifying this if statement as below, it can also support float input arrays.
        # But, it may be inaccurate due to rounding errors.
        # if np.isclose(a[i], b[j], atol=0.0, rtol=1e-14):
        #     return True
        if a[i] == b[j]:
            return True

        if a[i] < b[j]:
            i += 1
        else:
            j += 1

    return False

def has_zero_cross_product2(a, b):
    if has_zero_vector(a) or has_zero_vector(b):
        return True

    ar = safe_divide(a[..., 0], a[..., 1])
    br = safe_divide(b[..., 0], b[..., 1])
    ar.sort()  # Sorting is required.
    br.sort()
    return has_intersect_sorted(ar, br)

这种优化的主要目的不是提前终止,而是在O(n)内计算交集(严格来说,排序需要O(n log n))。因此,这使得它在50-50的情况下又快了1.5倍,但在高概率的情况下几乎没有区别。
总而言之,除非在输入的最开始总是零叉积,否则在这种情况下不需要提前终止。

相关问题