numpy 从矩阵中快速找到第一个匹配子矩阵的方法

vsikbqxv  于 2023-10-19  发布在  其他
关注(0)|答案(3)|浏览(153)

我的矩阵很简单,就像:

# python3 numpy
>>> A
array([[0., 0., 1., 1., 1.],
       [0., 0., 1., 1., 1.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])
>>> P
array([[0., 0., 0., 0.]])

我需要在A中找到一个与P(1x4)大小相同的全零区域(一个就足够了)。正确答案包括:

(2, 0)  # The vertex coordinates of the all-zero rectangular region that P can be matched
(2, 1)
(3, 0)
(3, 1)
(4, 0)
(4, 1)
# Just get any 1 answer

实际上我的A矩阵将达到30,000 * 30,000的大小。我担心如果写成循环语句会很慢。有什么捷径吗?
P的大小不确定,从1030到400080。同时,A矩阵缺乏规律性,从任意点开始循环可能需要遍历整个矩阵才能成功匹配

ukxgm1gy

ukxgm1gy1#

正如@Julien在评论中指出的那样,一般来说,我们可以使用滑动窗口来完成这类任务。

def find_all_zero_region_by_sliding_window(a, shape):
    x, y = np.nonzero(np.lib.stride_tricks.sliding_window_view(a, shape).max(axis=-1).max(axis=-1) == 0)
    return np.stack((x, y), axis=-1)

find_all_zero_region_by_sliding_window(A, P.shape)

然而,不幸的是,这需要大量的内存。

numpy.core._exceptions.MemoryError: Unable to allocate 11.3 TiB for an array with shape (26001, 29921, 4000) and data type float32
                                                       ^^^^^^^^

作为替代方案,我认为使用Summed-area table是一个好主意。
它类似于上面的滑动窗口方法,但不是找到最大值,我们可以计算总和(非常有效)并搜索它为零的位置。请注意,这假设A不包含任何负值。否则,必须使用numpy.abs
由于我们不需要计算任何给定位置的和,我采用了这个想法并实现了它,只需要一个单行缓存。

import numpy as np
from typing import Tuple

def find_all_zero_region(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
    input_height, input_width = arr.shape
    kernel_height, kernel_width = kernel_size

    matches = []

    # Calculate summed_line for y==0.
    summed_line = arr[:kernel_height].sum(axis=0)

    for y in range(input_height - kernel_height + 1):
        # Update summed_line for row y.
        if y != 0:  # Except y==0, which already calculated above.
            # Adding new row and subtracting old row.
            summed_line += arr[y + kernel_height - 1] - arr[y - 1]

        # Calculate kernel_sum for (y, 0).
        kernel_sum = summed_line[:kernel_width].sum()
        if kernel_sum == 0:
            matches.append((y, 0))

        # Calculate kernel_sum for (y, 1) to (y, right-edge).
        # Using the idea of a summed-area table, but in 1D (horizontally).
        (all_zero_region_cols,) = np.nonzero(kernel_sum + np.cumsum(summed_line[kernel_width:] - summed_line[:-kernel_width]) == 0)
        for col in all_zero_region_cols:
            matches.append((y, col + 1))

    if not matches:
        # For Numba, output must be a 2d array.
        return np.zeros((0, 2), dtype=np.int64)
    return np.array(matches, dtype=np.int64)

正如您所看到的,这使用了循环,但它应该比您想象的要快得多,因为所需的内存相对较小,并且计算/比较的数量大大减少。这里有一些时间代码。

import time

rng = np.random.default_rng(0)
A = rng.integers(0, 2, size=(30000, 30000)).astype(np.float32)

P = np.zeros(shape=(4000, 80))

# Create an all-zero region in the bottom right corner which will be searched last.
A[-P.shape[0] :, -P.shape[1] :] = 0

started = time.perf_counter()
result = find_all_zero_region(A, P.shape)
print(f"{time.perf_counter() - started} sec")
print(result)
# 3.541154200000001 sec
# [[26000 29920]]

此外,通过使用Numba,此功能甚至可以更快。只需添加如下注解:

import numba

@numba.njit("int64[:,:](float32[:,:],UniTuple(int64,2))")
def find_all_zero_region_with_numba(arr: np.ndarray, kernel_size: Tuple[int, int]) -> np.ndarray:
    ...
started = time.perf_counter()
find_all_zero_region_with_numba(A, P.shape)
print(f"{time.perf_counter() - started} sec")
# 1.6005743999999993 sec

请注意,我实现它是为了查找全零区域的所有位置,但您也可以让它返回第一个位置。由于它使用循环,平均执行时间将更快。

jutyujz0

jutyujz02#

首先,完全分析A可能会很昂贵,特别是因为A内存很大(所以它适合RAM),RAM很慢。我们可以逐行扫描A,以便找出P可以适合Anp最后一行,其中np,mp = P.shapena,ma = A.shape。问题是np可能很大,所以我们需要一个快速的方法来检查。
为了使问题更容易理解,假设我们可以(有效地)预先计算B = A == 0。现在的问题是如何找到一个大小至少为(np,mp)True值的2D区域。要做到这一点,我们可以计算B的最后一行np的逻辑与,并检查结果中是否有至少mp值的序列。例如,如果np=3mp=4,那么我们可以为每个可能的i计算mask = B[i] & B[i+1] & B[i+2],然后检查每个mask是否有4个连续的True。这个操作只不过是一个带有逻辑AND运算符的2D卷积,上面的优化(大大降低了 * 算法复杂度 *)被称为滤波器分离
实际上,注意布尔数的逻辑AND等价于0-1值的乘积。这意味着我们可以使用2D快速傅立叶变换(FFT)来加速卷积。虽然2D FFT可以用来计算这个问题的算法优化(在O(na ma ((log na) + (log ma)))时间),这需要大量的内存,我们不能合并与扫描线策略。我们可以找到另一种策略,使用更少的内存,代价是可能更多的计算(以及较低的最佳复杂度)。
可以看到,连续行的mask大多数都计算相同的东西,特别是当np很大时。实际上,对于i=0,我们计算mask = B[0] & B[1] & B[2],对于i=1,我们计算mask = B[1] & B[2] & B[3]B[1] & B[2]被重新计算两次。对于k个连续行,np-k项被共享/重新计算。因此,如果np很大,最好因式分解运算,以免重新计算许多项。
要做到这一点,一个有效的方法是构建一个包含部分约简的二叉树。例如,我们可以计算node0 = B[0] & B[1]node1 = B[2] & B[3],然后是node2 = node0 & node1,所以如果我们想计算B[0] & B[1] & B[2],我们可以重用二叉树节点。我们可以证明O(log np)节点值足以计算mask(即,np最后几行的交点)。这意味着该解决方案是有效的(类似于FFT)。然而,这并不容易实现。
一个更简单,效率更低的解决方案是通过大小为c来计算线的交点。c需要仔细选择:大到足以避免重复计算几次相同的事情,小到足以让块实际上是有用的。对于相对较大的np值,c = int(ceil(sqrt(np)/2))当然是一个很好的解决方案(当np<=4这样非常小时,不使用块当然更好)。
注意,你可以将布尔值打包成比特,这样可以使线的相交速度明显加快,因为得到的数组小了8倍。这可以通过np.packbits来实现。使用这种策略,布尔值的逻辑AND被逐位AND(如果不是更快的话,也是一样快)所取代。一行长度为30_000的A需要234 KiB的RAM,而相关的布尔值只能存储在4 KiB中。最后一种方法可以更好地适应CPU缓存,加快计算速度。
注意,当找到匹配的扫描线时,可以提前停止计算。另外请注意,如果mp << np,那么首先转置A可能会快得多。
为了提高性能,可以使用Cython或Numba来高效地计算(CPython循环很慢,但Cython/Numba的循环却不慢)。

pb3skfrl

pb3skfrl3#

要查找具有特定高度/宽度的所有零矩形的坐标,您可以将1和0转换为水平连续0的数量,并将其与目标宽度进行比较。然后做同样的事情垂直上的True的结果的横向比较。所有累积到最小垂直尺寸的坐标将表示零矩形的右下角:

import numpy as np

A = np. array([
       [0., 0., 1., 1., 1.],
       [0., 0., 1., 1., 1.],
       [0., 0., 0., 0., 0.],
       [0., 1., 0., 0., 1.],
       [0., 0., 0., 0., 0.]])

height,width = 3,2

H0 = (A == 0)*(np.arange(A.shape[1])+1)[None,:]
H1 = (A != 0)*(np.arange(A.shape[1])+1)[None,:]
Hz = H0 - np.maximum.accumulate(H1,axis=1)

V0 = (Hz >= width)*(np.arange(A.shape[0])+1)[:,None]
V1 = (Hz <  width)*(np.arange(A.shape[0])+1)[:,None]
V2 = V0 - np.maximum.accumulate(V1,axis=0)

bottomRight = np.where(V2 >= height)
topLeft     = bottomRight - np.array([[height-1],[width-1]])

print(*zip(*topLeft))
print(*zip(*bottomRight))

(0, 0) (2, 2)
(2, 1) (4, 3)

通过检查中间结果可能更容易掌握这一点:

Hz水平连续0的数量(1为负值)

array([[ 1,  2, -3, -4, -5],
       [ 1,  2, -3, -4, -5],
       [ 1,  2,  3,  4,  5],
       [ 1, -2,  1,  2, -5],
       [ 1,  2,  3,  4,  5]])
  • 2(或更大)的值对应于匹配宽度要求的水平连续零的结束位置 *
    V2水平尺寸的垂直连续匹配的数量(Hz >=宽度)
array([[-1,  1, -1, -1, -1],
       [-2,  2, -2, -2, -2],
       [-3,  3,  1,  1,  1],
       [-4, -4, -4,  2, -4],
       [-5,  1,  1,  3,  1]])
  • 包含3(或更多)的位置对应于大小为3x 2的零个矩形的右下角 *
    业绩

这种“整个矩阵”方法在处理非常大的数据时速度很慢(30,000 x 30,000需要220秒)。由于您只查找一个示例,因此混合矢量化/循环方法可能会给予更好的结果:

V2       = np.zeros(A.shape[1])
rowRange = np.arange(A.shape[1])+1
for r,row in enumerate(A==0):
    H0 = row*rowRange
    H1 = (~row)*rowRange
    Hz = (H0 - np.maximum.accumulate(H1)) >= width
    V2 = V2*Hz + Hz
    if np.any(V2>=height):
        print(time()-start,"found",(r,np.where(V2>=height)[0][0]))
        break

这种混合方法使用相同的技术,但逐行而不是在整个矩阵上进行。如果矩阵不是正方形,您可以根据较小的维度选择按列或按行进行处理,从而进一步优化。由于Pyhton循环比向量化的numpy计算慢,这应该会最大限度地减少Python代码开销。
在一个30,000 x 30,000的矩阵上,当没有找到任何匹配时只花了9秒(这是最坏的情况)。它在较小的尺寸上并不更快(例如500 x 500),但它确实减少了numpy操纵的内存量,并最终赶上了。
在这两种情况下,速度与P的大小无关。

相关问题