numpy 矢量化在另一个数组中搜索元素的For循环

ylamdve6  于 2023-03-18  发布在  其他
关注(0)|答案(1)|浏览(156)

我有两个数组,看起来像这样:
数组1(locs_array)是一个二维数组,包含我称为lon和lat的值对,例如:

array([[-122.463425,   47.195741],
       [-122.498139,   47.190166]])

数组2(parents_array)也是包含以下列[id、centerlon、centerlat、upperlon、lowerlon、upperlat、lowerlat]的2D数组,例如:

array([[   1.        , -122.463425  ,   47.195741  , -122.46331271,
        -122.46353729,   47.19585367,   47.19562833],
       [   2.        , -122.498149  ,   47.190275  , -122.49803671,
        -122.49826129,   47.19038767,   47.19016233]])

简单介绍一下上下文,第一个数组是lon/lat位置的列表,而第二个数组包含网格系统的定义,该网格系统通过存储lon和lat的中心、上坐标和下坐标来实现。
我基本上是在尝试找到最快的方法来找到(数组1中的)各个位置属于(数组2中的)哪些网格。
这就是我目前所做的:
下面是我的函数,用来查找给定lon/lat对的父对象:

def _findFirstParent(self, lon, lat, parents_array):
    parentID = -1
    a = np.where(parents_array[:,3] >= lon)[0] # indices with upperLon >= lon
    b = np.where(parents_array[:,4] <= lon)[0] # indices with lowerLon <= lon
    c = np.intersect1d(a, b) # lon matches

    d = np.where(parents_array[:,5] >= lat)[0] # indices with upperLat >= lat
    e = np.where(parents_array[:,6] <= lat)[0] # indices with lowerLat <= lat
    f = np.intersect1d(d, e) # lat matches

    g = np.intersect1d(c, f) # both match
    if len(g) > 0:
        parentID = g[0]
    return parentID

我用下面的代码调用它:

for i in range(len(locs_array)):
     each_lon = locs_array[i][0]
     each_lat = locs_array[i][1]
     parentID = findFirstParent(each_lon, each_lat, parents_array)

数组1(位置数组)包含超过1亿条记录,但我想如果需要的话,我可以将其分解为更小的块,数组2(网格数组)包含超过100万条记录。
我有什么选择可以让这个更快?任何帮助将不胜感激。

3htmauhk

3htmauhk1#

对这样的循环进行矢量化的一种方法是广播,强制numpy将loc_array中的任何条目与parent_array中的任何条目进行比较。
例如,如果您试图找到A的每个整数,B的哪个整数是它的倍数,而不是迭代A的值,您可以

A=np.array([1,2,3,4,5,6,7,8,9,10])
B=np.array([11,12,13,14,15,16,17,18,19,20])

res = B[None,:]%A[:,None] == 0

这将返回一个二维数组,例如当B[j]A[i]的倍数时res[i,j]为True,然后np.argmax(B[None,:]%A[:,None] == 0, axis=1)给出答案(假设总是有一个,这里就是这种情况)
所以在你的情况下

(locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6])

是索引[i,j]的值为True的二维数组,当且仅当locs_array[j]parents_array[i]的边界内
所以

idx = np.argmax((locs_array[None,:,0]<=parents_array[:,None,3]) & (locs_array[None,:,0]>=parents_array[:,None,4]) & (locs_array[None,:,1]<=parents_array[:,None,5]) & (locs_array[None,:,1]>=parents_array[:,None,6]), axis=0)

len(locs_array)个整数的数组,每个整数是parents_array中每个位置所在的条目的索引。换句话说,idx[j]=i意味着locs_array[j]parents_array[i]的边界内。我相信这就是您想要的。
两个警告

  • 这是假设总是有答案。如果loc_array的条目jparents_array的任何条目都没有边界,则idx[j]将为0。因此,如果结果为0,您可能需要再次检查结果(因为0可能意味着该位置包含在parents_array[0]中,也可能不包含在parents_array[0]中)。或者,更好的解决方案是使用一个标记:设parents_array[0]是一个不可能的项(例如上界小于下界,所以它不能包含任何东西),那么,你知道如果index是0,这意味着没有解。
  • 这只是你的算法的向量化,我很确定它比你的实现快得多,因为你在python中迭代(用你的for循环),而我在numpy中迭代(让broadcasting做所有的组合),但这仍然是完全相同的计算:比较所有locs_array[j]和所有parents_array[i]。它只是numpy为我们处理for循环。所以,同样,当然要快得多(不能对它进行基准测试,因为您没有提供[mre]),但仍然是相同的算法。评论中建议的是另一种(更快,具有不同的O复杂度)算法,这可以得益于parents_array被排序的知识,例如,它允许二分法搜索,但是不容易矢量化。

相关问题