如何获取numpy数组中重复元素的所有索引列表

t2a7ltrp  于 11个月前  发布在  其他
关注(0)|答案(9)|浏览(169)

我试图获取numpy数组中所有重复元素的索引,但我目前找到的解决方案对于大型(>20000个元素)输入数组来说效率非常低(大约需要9秒)。想法很简单:

  1. records_array是一个numpy时间戳数组(datetime),我们希望从中提取重复时间戳的索引
  2. time_array是一个numpy数组,包含在records_array中重复的所有时间戳
  3. records是一个django QuerySet(可以很容易地转换为列表),包含一些Record对象。我们希望创建一个由Record的tagId属性的所有可能组合组成的对列表,这些组合对应于从records_array中找到的重复时间戳。
    以下是我目前使用的工作(但效率低下)代码:
tag_couples = [];
for t in time_array:
    users_inter = np.nonzero(records_array == t)[0] # Get all repeated timestamps in records_array for time t
    l = [str(records[i].tagId) for i in users_inter] # Create a temporary list containing all tagIds recorded at time t
    if l.count(l[0]) != len(l): #remove tuples formed by the first tag repeated
        tag_couples +=[x for x in itertools.combinations(list(set(l)),2)] # Remove duplicates with list(set(l)) and append all possible couple combinations to tag_couples

字符串
我很确定这可以通过使用Numpy来优化,但是我找不到一种方法来比较records_arraytime_array的每个元素,而不使用for循环(这不能只使用==进行比较,因为它们都是数组)。

gajydyqb

gajydyqb1#

numpy的矢量化解决方案,在unique()的魔力上。

import numpy as np

# create a test array
records_array = np.array([1, 2, 3, 1, 1, 3, 4, 3, 2])

# creates an array of indices, sorted by unique element
idx_sort = np.argsort(records_array)

# sorts records array so all unique elements are together 
sorted_records_array = records_array[idx_sort]

# returns the unique values, the index of the first occurrence of a value, and the count for each element
vals, idx_start, count = np.unique(sorted_records_array, return_counts=True, return_index=True)

# splits the indices into separate arrays
res = np.split(idx_sort, idx_start[1:])

#filter them with respect to their size, keeping only items occurring more than once
vals = vals[count > 1]
res = filter(lambda x: x.size > 1, res)

字符串
下面的代码是原始答案,它需要更多的内存,使用numpy广播并调用unique两次:

records_array = array([1, 2, 3, 1, 1, 3, 4, 3, 2])
vals, inverse, count = unique(records_array, return_inverse=True,
                              return_counts=True)

idx_vals_repeated = where(count > 1)[0]
vals_repeated = vals[idx_vals_repeated]

rows, cols = where(inverse == idx_vals_repeated[:, newaxis])
_, inverse_rows = unique(rows, return_index=True)
res = split(cols, inverse_rows[1:])


与预期的res = [array([0, 3, 4]), array([1, 8]), array([2, 5, 7])]一样

4sup72z8

4sup72z82#

  • 答案是复杂的,并且取决于数组中唯一元素的大小和数量。
  • 以下内容:
  • 测试具有2M个元素和最多20k个唯一元素的数组。
  • 测试最多80k个元素的数组,最多20k个唯一元素
  • 对于小于40k个元素的数组,测试具有的唯一元素最多为数组大小的一半(例如,10k个元素将具有最多5k个唯一元素)。

包含2M个元素的数组

  • 对于多达大约200个唯一元素,np.wheredefaultdict快,但是比pandas.core.groupby.GroupBy.indicesnp.unique慢。
  • 使用pandas的解决方案是大型阵列的最快解决方案。

最多包含80k个元素的阵列

  • 这更多地取决于阵列的大小和独特元素的数量。
  • defaultdict是约2400个元素的数组的快速选项,特别是具有大量唯一元素的数组。
  • 对于大于40k个元素和20k个唯一元素的数组,pandals是最快的选择。

%timeit第一代

import random
import numpy
import pandas as pd
from collections import defaultdict

def dd(l):
    # default_dict test
    indices = defaultdict(list)
    for i, v in enumerate(l):
        indices[v].append(i)
    return indices

def npw(l):
    # np_where test
    return {v: np.where(l == v)[0] for v in np.unique(l)}

def uni(records_array):
    # np_unique test
    idx_sort = np.argsort(records_array)
    sorted_records_array = records_array[idx_sort]
    vals, idx_start, count = np.unique(sorted_records_array, return_counts=True, return_index=True)
    res = np.split(idx_sort, idx_start[1:])
    return dict(zip(vals, res))

def daf(l):
    # pandas test
    return pd.DataFrame(l).groupby([0]).indices

data = defaultdict(list)

for x in range(4, 20000, 100):  # number of unique elements
    # create 2M element list
    random.seed(365)
    a = np.array([random.choice(range(x)) for _ in range(2000000)])
    
    res1 = %timeit -r2 -n1 -q -o dd(a)
    res2 = %timeit -r2 -n1 -q -o npw(a)
    res3 = %timeit -r2 -n1 -q -o uni(a)
    res4 = %timeit -r2 -n1 -q -o daf(a)
    
    data['defaut_dict'].append(res1.average)
    data['np_where'].append(res2.average)
    data['np_unique'].append(res3.average)
    data['pandas'].append(res4.average)
    data['idx'].append(x)

df = pd.DataFrame(data)
df.set_index('idx', inplace=True)

df.plot(figsize=(12, 5), xlabel='unique samples', ylabel='average time (s)', title='%timeit test: 2 run 1 loop each')
plt.legend(bbox_to_anchor=(1.0, 1), loc='upper left')
plt.show()

字符串

使用2M个元素的测试


的数据




使用多达80k个元素进行测试














bkkx9g8r

bkkx9g8r3#

你也可以这样做:

a = [1,2,3,1,1,3,4,3,2]
index_sets = [np.argwhere(i==a) for i in np.unique(a)]

字符串
这将给你给予一组数组,数组的索引是唯一的元素。

[array([[0],[3],[4]], dtype=int64), 
array([[1],[8]], dtype=int64), 
array([[2],[5],[7]], dtype=int64), 
array([[6]], dtype=int64)]


又道:列表解析中的进一步变化也可以丢弃单个唯一值,并解决许多唯一单个出现元素的速度问题:

new_index_sets = [np.argwhere(i[0]== a) for i in np.array(np.unique(a, return_counts=True)).T if i[1]>=2]


这给出:

[array([[0],[3],[4]], dtype=int64), 
 array([[1],[8]], dtype=int64), 
 array([[2],[5],[7]], dtype=int64)]

drkbr07n

drkbr07n4#

编辑:这个答案查找重复的连续项,而不是一个项的所有副本。例如,在数组[1,2,2,3,2]中,它将返回[1,2]而不是[1,2,4]。由于这仍然是非常有效的,并且可能对某些人有用,我将留下答案。
我发现不使用np.unique,而使用np.diff的速度明显更快,并且更好地处理非排序的初始数组。
为了说明这一点,我运行了@特伦顿麦金尼的基准测试,测试了两个试验数字(200万和20 k),以表明diff解决方案可以消除其他数字。它也不需要排序数组或对数组进行排序,这是一个显著的好处。
函数如下:

def find_repeats(arr: np.ndarray) -> np.ndarray:
    """Find indices of repeat values in an array.

    Args:
        arr (np.ndarray): An array to find repeat values in.

    Returns:
        np.ndarray: An array of indices into arr which are the values which
            repeat.
    """

    arr_diff = np.diff(arr, append=[arr[-1] + 1])
    res_mask = arr_diff == 0
    arr_diff_zero_right = np.nonzero(res_mask)[0] + 1
    res_mask[arr_diff_zero_right] = True
    return np.nonzero(res_mask)[0]

字符串

200万元素

x1c 0d1x的数据

20 k个元素


完整测试代码

import random
from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
from collections import defaultdict
import time

def find_repeats(arr: np.ndarray) -> np.ndarray:
    """Find indices of repeat values in an array.

    Args:
        arr (np.ndarray): An array to find repeat values in.

    Returns:
        np.ndarray: An array of indices into arr which are the values which
            repeat.
    """

    arr_diff = np.diff(arr, append=[arr[-1] + 1])
    res_mask = arr_diff == 0
    arr_diff_zero_right = np.nonzero(res_mask)[0] + 1
    res_mask[arr_diff_zero_right] = True
    return np.nonzero(res_mask)[0]

def dd(l):
    # default_dict test
    indices = defaultdict(list)
    for i, v in enumerate(l):
        indices[v].append(i)
    return indices

def npw(l):
    # np_where test
    return {v: np.where(l == v)[0] for v in np.unique(l)}

def uni(records_array):
    # np_unique test
    idx_sort = np.argsort(records_array)
    sorted_records_array = records_array[idx_sort]
    vals, idx_start, count = np.unique(
        sorted_records_array, return_counts=True, return_index=True)
    res = np.split(idx_sort, idx_start[1:])
    return dict(zip(vals, res))

def daf(l):
    # pandas test
    return pd.DataFrame(l).groupby([0]).indices

data = defaultdict(list)

for x in range(4, 20000, 1000):  # number of unique elements
    print(f"{x} trial done")
    # create 2M element list
    random.seed(365)
    a = np.array([random.choice(range(x)) for _ in range(2000000)])
    num_runs = 2
    t0 = time.time()
    for i in range(num_runs):
        dd(a)
    res1 = time.time() - t0

    t0 = time.time()
    for i in range(num_runs):
        uni(a)
    res3 = time.time() - t0

    t0 = time.time()
    for i in range(num_runs):
        daf(a)
    res4 = time.time() - t0

    t0 = time.time()
    for i in range(num_runs):
        find_repeats(a)
    res5 = time.time() - t0

    data['defaut_dict'].append(res1 / num_runs)
    data['np_unique'].append(res3 / num_runs)
    data['pandas'].append(res4 / num_runs)
    data['np_diff'].append(res5 / num_runs)
    data['idx'].append(x)

df = pd.DataFrame(data)
df.set_index('idx', inplace=True)

df.plot(figsize=(12, 5), xlabel='unique samples',
        ylabel='average time (s)', title='%timeit test: 2 run 1 loop each')
plt.legend(bbox_to_anchor=(1.0, 1), loc='upper left')
plt.show()

1u4esq0p

1u4esq0p5#

所以我无法摆脱for循环,但我能够将其配对为使用for循环,并使用set数据类型和list.count()方法:

data = [1,2,3,1,4,5,2,2]
indivs = set(data)

multi_index = lambda lst, val: [i for i, x in enumerate(lst) if x == val]

if data != list(indivs):
    dupes = [multi_index(data, i) for i in indivs if data.count(i) > 1]

字符串
在这里,你循环遍历indivs集合,它包含值(没有重复),然后如果你发现一个有重复的项目,循环遍历整个列表。如果这对你来说不够快,我正在寻找numpy替代方案。如果需要的话,生成器对象也可以加快速度。
编辑:gg 349的答案包含了我正在研究的numpy解决方案!

cxfofazt

cxfofazt6#

你可以做一些沿着线:

1. add original index ref so [[1,0],[2,1],[3,2],[1,3],[1,4]...
2. sort on [:,0]
3. use np.where(ra[1:,0] != ra[:-1,0])
4. use the list of indexes from above to construct your final list of lists

字符串
编辑-好的,所以在我快速回复后,我已经离开了一段时间,我看到我已经被投票否决了,这是公平的,因为numpy.argsort()是一个比我的建议好得多的方法。我确实投票赞成numpy.unique()的答案,因为这是一个有趣的功能。但是,如果你使用timeit,你会发现,

idx_start = np.where(sorted_records_array[:-1] != sorted_records_array[1:])[0] + 1
res = np.split(idx_sort, idx_start)

vals, idx_start = np.unique(sorted_records_array, return_index=True)
res = np.split(idx_sort, idx_start[1:])


进一步编辑以下问题@尼古拉斯
我不确定你能做到。有可能得到两个对应于断点的索引数组,但是你不能用np把数组的不同“行”分成不同大小的块。

a = np.array([[4,27,42,12, 4 .. 240, 12], [3,65,23...] etc])
idx = np.argsort(a, axis=1)
sorted_a = np.diagonal(a[:, idx[:]]).T
idx_start = np.where(sorted_a[:,:-1] != sorted_a[:,1:])

# idx_start => (array([0,0,0,..1,1,..]), array([1,4,6,7..99,0,4,5]))


但这可能已经足够好了,这取决于你想用这些信息做什么。

5fjcxozz

5fjcxozz7#

import numpy as np
from numpy.lib import recfunctions as rfn

ndtype = [('records_array', int)] # Check the data type
records_array = np.ma.array([1, 2, 1, 3, 2, 3, 3, 4, 5]).view(ndtype) # Structured array
idxs = list(rfn.find_duplicates(records_array, key=None, ignoremask=True, return_index=True)[1]) # List of indices of repeated elements

字符串

4dbbbstv

4dbbbstv8#

np.unique所有索引

@gg349的解决方案打包成一个函数:

def np_unique_indices(arr, **kwargs):
    """Unique indices for N-D arrays."""
    vals, indices, *others = np_unique_indices_1d(arr.reshape(-1), **kwargs)
    indices = [np.stack(np.unravel_index(x, arr.shape)) for x in indices]
    return vals, indices, *others

def np_unique_indices_1d(arr, **kwargs):
    """Unique indices for 1D arrays."""
    sort_indices = np.argsort(arr)
    arr = np.asarray(arr)[sort_indices]
    vals, first_indices, *others = np.unique(
        arr, return_index=True, **kwargs
    )
    indices = np.split(sort_indices, first_indices[1:])
    for x in indices:
        x.sort()
    return vals, indices, *others

字符串
它本质上与np.unique相同,但返回 all 索引,而不仅仅是 first 索引。
示例用法:

arr = np.array([
    [0, 1, 1, 0],
    [0, 2, 2, 0],
    [0, 2, 2, 0],
    [0, 1, 1, 0],
])

vals, indices = np_unique_indices(arr)

for val, idx in zip(vals, indices):
    print(f"{val}:\n{idx}\n")


输出量:

0:
[[0 0 1 1 2 2 3 3]
 [0 3 0 3 0 3 0 3]]

1:
[[0 0 3 3]
 [1 2 1 2]]

2:
[[1 1 2 2]
 [1 2 1 2]]

sauutmhj

sauutmhj9#

numba.jit

另一种解决方案,但使用numba.jit

def np_unique_indices(arr, **kwargs):
    """Unique indices for N-D arrays."""
    vals, indices = np_unique_indices_1d(arr.reshape(-1))
    vals = np.asarray(vals)
    indices = [np.stack(np.unravel_index(x, arr.shape)) for x in indices]
    return vals, indices

@numba.njit
def np_unique_indices_1d(arr):
    """Unique indices for 1D arrays."""
    idxs = [[0 for _ in range(0)] for _ in range(0)]
    ptr = {}
    ptr_count = 0

    for i, x in enumerate(arr):
        if (x in ptr) == False:
            idxs.append([0 for _ in range(0)])
            ptr[x] = ptr_count
            ptr_count += 1
        idxs[ptr[x]].append(i)

    vals = [x for x in ptr]
    idxs = [np.array(x) for x in idxs]
    return vals, idxs

字符串
使用@特伦顿麦金尼和用户27443的基准测试:
x1c 0d1x的数据
请注意,所有解决方案的性能都取决于数组的大小和唯一标签的数量,因此我建议您自己测试自己的数据。

相关问题