c# 如何将事件时间标签快速排序到多个bin中?

ddarikpa  于 2023-10-14  发布在  C#
关注(0)|答案(1)|浏览(193)

我有一个表示检测事件的事件时间标签流(~ 1 '000'000)。时间标签是指示在该时间段内是否发生检测的单位。如果没有时间标签,则在该时间段内没有发生检测。该列表是单调排序的。除此之外,我有一个定时表,它定义了我想要排序时间标签的某些通道。如果在时间表中指定的特定时间段内发生了一个或多个检测事件,则我们说在相应通道上发生了事件(即,对于时序表条目,结果为“真”)。定时表描述了重复的通道单元。
到目前为止,我基本上对所有的时间标签都执行了嵌套的for循环。对于每个标记,我运行第二个循环来检查所有时序表条目。然而,这种方法非常缓慢。有没有更好的算法来更快地实现这一点?
在伪代码中,我做了什么:

tags = [8,500,1001,1265,1266,1502,1600,2990]
table = [0,1260,1480]
window = 10
single_run_duration = 1500
reps = ceil(max(tags)/single_run_duration)
storage = zero_array(len(table),reps)

for ti in tags:
   for ch in table:
      if ch <= (ti % single_run_duration) <= ch+window:
         run = floor(ti/single_run_duration)
         storage(ch,run) = True
// Storage contains afterwards:
storage = [[True,True],[True,False],[False,False]]

编辑:窗口不重叠,表格也是单调有序的。

k97glaaz

k97glaaz1#

一种方法是从通道时序表创建一个(完美的)哈希表,这样就可以避免O(N)扫描或O(ln N)查找。

storage = np.zeros((len(table), reps), dtype=int)

lookup = [None] * single_run_duration

for ch_index, ch in enumerate(table):
    for i in range(ch, min(ch+window+1, single_run_duration)):
        lookup[i] = [ch_index] if not lookup[i] else lookup[i] + [ch_index]

for ti in tags:
    channels = lookup[ti % single_run_duration]
    if not channels:
        continue
    for ch_index in channels:
        run = math.floor(ti/single_run_duration)
        storage[ch_index, run] = 1

在Python中,对于100万个标记和大约300个表条目,这大约需要1.9秒,而对于您提供的线性扫描算法,这需要44秒。
这将时间Map到通道索引列表,如果已知窗口从不重叠,则可以稍微简化。
Python测试基础设施:

import numpy as np
import math
import sys
import timeit

tags = np.cumsum(np.random.randint(1, 501, 1000_000))
table = np.cumsum(np.random.randint(1, 10, 1500))
window = 10
single_run_duration = 1500
table= table[table < single_run_duration - window]
reps = math.ceil(max(tags)/single_run_duration)

# original
def map_1():
   storage = np.zeros((len(table),reps), dtype=int)

   for ti in tags:
      for ch_index, ch in enumerate(table):
         if ch <= (ti % single_run_duration) <= ch+window:
            run = math.floor(ti/single_run_duration)
            storage[ch_index,run] = 1

   return storage

timeit.timeit(map_1, number=1)

# using hashmap
def map_2():
    storage = np.zeros((len(table), reps), dtype=int)

    lookup = [None] * single_run_duration

    for ch_index, ch in enumerate(table):
        for i in range(ch, min(ch+window+1, single_run_duration)):
            lookup[i] = [ch_index] if not lookup[i] else lookup[i] + [ch_index]

    for ti in tags:
        channels = lookup[ti % single_run_duration]
        if not channels:
            continue
        for ch_index in channels:
            run = math.floor(ti/single_run_duration)
            storage[ch_index, run] = 1
    
    return storage

timeit.timeit(map_2, number=1)

assert (map_1() == map_2()).all()

相关问题