numpy 大量整数数组之间的快速计数匹配

zqry0prt  于 2023-06-23  发布在  其他
关注(0)|答案(2)|浏览(125)

我想知道是否有任何有效的算法来计算大量整数数组之间匹配的整数的数量。Cython中的代码如下所示。
match_ints.pyx

cimport cython
from libc.stdlib cimport calloc, free

import numpy as np
cimport numpy as np

np.import_array()

@cython.wraparound(False)
@cython.boundscheck(False)
@cython.initializedcheck(False)
cdef void count_matches(int[:, ::1] target_arrays, int[::1] ref_array, int[::1] num_matches):

    cdef:
        Py_ssize_t i, j
        Py_ssize_t n = target_arrays.shape[0]
        Py_ssize_t c = target_arrays.shape[1]
        Py_ssize_t nf = ref_array.shape[0]
        Py_ssize_t m = ref_array[nf - 1] + 5
        int * ind = <int *> calloc(m, sizeof(int))
        int k, g

    for i in range(nf):
        ind[ref_array[i]] = 1

    for i in range(n):
        k = 0
        for j in range(c):
            g = target_arrays[i, j]
            if g < m and ind[g] == 1:
                k += 1
        num_matches[i] = k

    free(ind)

cpdef count_num_matches(int[:, ::1] target_arrays, int[::1] ref_array):

    cdef:
        Py_ssize_t n = target_arrays.shape[0]
        int[::1] num_matches = np.zeros(n, dtype=np.int32)

    count_matches(target_arrays, ref_array, num_matches)

    return np.asarray(num_matches)

这里的想法很简单。对于要匹配的引用整数数组,它按升序排序(通过sort方法)。利用数组中的整数不大的优点,创建了一个指示符数组ind,其长度为参考数组的最大整数(+5,以避免索引超出范围)。因此,每个整数都被认为是一个索引,并且ind中的相应位置被分配为1。然后遍历每一个target_array来计算引用数组中匹配的整数的个数。
在匹配过程中,如果ind中的索引为1,则target_arrays中的所有整数都被视为索引并匹配。
测试方法设置为test_main_counts.py

# test_main_counts.py
from match_ints import count_num_matches
import numpy as np

def count_num_matches_main():
    x = np.random.randint(50, 6000, size=(1000000, 40), dtype=np.int32)
    ref_x = np.random.randint(100, 2500, size=800, dtype=np.int32)

    ref_x.sort()

    return count_num_matches(x, ref_x)

if __name__ == "__main__":
     nums = count_num_matches_main()
     print(nums[:10])

setup文件。

from setuptools import setup
from Cython.Build import cythonize
import numpy as np

setup(
    ext_modules=cythonize(
        "match_ints.pyx",
        compiler_directives={
            "language_level": "3",
        }
    ),
    include_dirs=[
        np.get_include()
    ]
)

因为所有的整数都不是很大,并且有很多重复(在我的真实的应用中,数百万个数组只包含几千个唯一整数),所以有没有相关的算法可以通过例如利用更少的唯一整数来改善这类问题?

kxxlusnw

kxxlusnw1#

target_arrays is fixed-很漂亮!
在这种情况下,您可以预先处理目标数组并构建一种“反向索引”(搜索引擎和数据库领域的概念)。对于每个可能的值,创建包含该值的目标数组的索引数组,如下所示:

inverted[] 
val   target_arrays_containing_val
1     2,5,100, 999999  
2     7, 13, 3141592
...
6000  3,111, 222,444,555,888

现在,对于ref_arr中的每个元素,相应数组的递增计数器

for x in ref_arr:
   for a in inverted[x]:
       num_matches[a] += 1

对于较短的目标数组(在示例中为40个),inverted[]的子列表应该很短,这样你会获得显著的收益:从当前主导的O(n*c)复杂度到O(nf*mean_sublist_size)
简单的Python实现。
我在targets中删除了重复的内容,但为了简单起见,不在ref中处理它们。

import random

targ = [[random.randint(1,50) for i in range(6)] for j in range(10)]
ref = [random.randint(1,30) for i in range(15)]

mx = max([max(x) for x in targ])
inverted = [[] for i in range(mx+1)]
num_matches = [0]*len(targ)
for ind, x in enumerate(targ):
    for val in x:
        if not (ind in inverted[val]):
            inverted[val].append(ind)    

for x in ref:
   for a in inverted[x]:
       num_matches[a] += 1

for x in targ:
    print(x)
print()
print(inverted)
print(ref)
print(num_matches)

>>>>
[39, 27, 30, 31, 12, 45]
[45, 45, 36, 16, 33, 10]
[32, 34, 48, 35, 36, 36]
[18, 16, 6, 19, 1, 35]
[32, 13, 1, 2, 30, 34]
[40, 18, 37, 18, 25, 47]
[21, 27, 33, 42, 43, 2]
[22, 47, 2, 34, 25, 26]
[30, 47, 6, 23, 39, 44]
[21, 43, 27, 1, 12, 30]

[[], [3, 4, 9], [4, 6, 7], [], [], [], [3, 8], [], [], [], [1], [],
 [0, 9], [4], [], [], [1, 3], [], [3, 5], [3], [], [6, 9], [7], [8],
 [], [5, 7], [7], [0, 6, 9], [], [], [0, 4, 8, 9], [0], [2, 4], [1, 6], 
 [2, 4, 7], [2, 3], [1, 2], [5], [], [0, 8], [5], [], [6], [6, 9], [8], 
 [0, 1], [], [5, 7, 8], [2]]

[15, 8, 22, 18, 15, 24, 16, 22, 30, 30, 27, 18, 9, 22, 17]

[3, 1, 0, 3, 2, 2, 1, 3, 2, 3]
vlju58qv

vlju58qv2#

正如评论中所建议的,将内容计数到dict中,对dict.keys()使用集合操作,从结果dict中找到具有最小出现次数的公共键/删除唯一键。
你可以避免排序+遍历,在构建字典时只遍历。通过使用一组“seen_keys”,你可以忽略后面列表中还没有被看到的值:

'''This is plain python - working code as guideline - you would have to transform that 
to cpython/numpy yourself'''
# millions of arrays with small amount of unique ints in it
l1 = [1,1,1,1,1,1, 2,2,2,2,2,2, 3,3,3,3,3,3, 4,4,4,4,4,4, 99] 
l2 = [1,1,1,1,1,   2,2,2,2,2,   3,3,3,3,3,   4,4,4,4,4,   98]
l3 = [1,           2,2,         3,3,3]

dicts = []
seen_keys = set(l1) # initial keys from one list, doesn't matter which

# m lists: go once through each list  m times O(n)
for l in [l1,l2,l3]:   # m lists result in m dicts
    curr_d = {} 
    dicts.append(curr_d)

    # o(n) with n=len(list) instead of sorting with O(n*log(n))
    for i in l:
        if i not in seen_keys: continue  # skipable -> missing in ealier list
        # in plain python you can use defaultdict(int) or Counter for speedups
        curr_d.setdefault(i,0) 
        curr_d[i] += 1

# resulting dict for minimal counts of keys that are in ALL lists
total_d = {}
for d in dicts:
    # initial values 
    if not total_d:
        total_d = dict(d.items())
        continue

    # remove all things from total_d that are not in new dict
    # this will reduce runtimes the further you go as the next step has fewer updates
    diffr = total_d.keys() - d.keys()
    for remove in diffr:
        del total_d[remove]

    # reduce count to minial for any key that is in total_d and new dict
    commn =  total_d.keys() & d.keys()
    for c in commn:
        total_d[c] = min(total_d[c],d[c])  # ternary maybe faster

print(total_d) #    total_d.su

输出:

{1: 1, 2: 2, 3: 3}

相关问题