python 如何在离散化的同时动态计算重叠范围的插入索引?

bis0qfac  于 2023-06-28  发布在  Python
关注(0)|答案(1)|浏览(121)

我写了一个脚本来处理重叠的间隔,我以前做过几次尝试,我不会在这里详细介绍,你可以看到this来获得更多的上下文。这个版本是我想出的最聪明的,它是最高效的,经过严格的测试我已经确定它的输出是完全正确的。
这个问题可以简化为,给定大量的三元组(a, b, c),在每个三元组中ab是整数,b >= a,每个三元组表示一个范围range(a, b + 1)c是与该范围相关的数据。(a, b, c)相当于{range(a, b + 1): c}
在范围中有很多重叠,并且相邻的范围共享数据。
假设有两个这样的范围,分别称为(s0, e0, d0)(s1, e1, d1),可能会发生以下3种不良情况之一:

  • s0 <= s1 <= e1 < e0,则第二个范围完全包含在第一个范围内,并且d0 == d1
  • 第二个范围是第一个范围的子范围,如上所述,但是d0 != d1
  • e0 == s1 - 1 and d0 == d1

以下情况永远不会发生:

  • e0 == s1s0 <= s1 and e0 <= e1

这些范围要么是完全离散的,要么一个是另一个的完整子集,没有部分交叉。
现在我想处理范围,使其没有重叠,并且所有相邻的范围共享不同的数据,这需要对上述3种情况中的每一种进行以下操作:

  • 忽略子范围,不做任何事情,因为它们共享相同的数据
  • 拆分父范围,删除父范围的重叠部分,并用子范围的数据覆盖重叠范围。
  • 忽略第二个范围,并将e0设置为e1,以便合并两个范围。

结果不应包含重叠范围和额外范围,每个范围只有一个基准面,并且始终从较新的较小范围继承。
输入的排序方式可以在Python中通过使用lambda x: (x[0], -x[1])作为键来获得。如果输入没有排序,我会在处理它之前以这种方式排序。
这是一个小例子:
数据:

[(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D')]

数据含义:

[
    {0: 'A', 1: 'A', 2: 'A', 3: 'A', 4: 'A', 5: 'A', 6: 'A', 7: 'A', 8: 'A', 9: 'A', 10: 'A'},
    {0: 'B', 1: 'B'},
    {2: 'C', 3: 'C', 4: 'C', 5: 'C'},
    {3: 'C', 4: 'C'},
    {6: 'C', 7: 'C'},
    {8: 'D'}
]

我的预期过程相当于简单地按顺序处理它们,并根据当前条目逐个更新字典。
处理数据:

{0: 'B', 1: 'B', 2: 'C', 3: 'C', 4: 'C', 5: 'C', 6: 'C', 7: 'C', 8: 'D', 9: 'A', 10: 'A'}

输出:

[(0, 1, 'B'), (2, 7, 'C'), (8, 8, 'D'), (9, 10, 'A')]

我需要尽可能高效地完成这项工作,因为我实际上有数百万个条目要处理,这并不夸张。我试着使用intervaltree,但它对我的目的来说太低效了:

In [3]: %timeit merge_rows([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
499 µs ± 14.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

以下是我的代码

from bisect import bisect_left

class Interval:
    instances = []

    def __init__(self, start, end, data):
        self.start = start
        self.end = end
        self.data = data
        self.ends = [end]
        self.children = []
        Interval.instances.append(self)

    def variate(self, start, data):
        return not self.start <= start <= self.end or data != self.data

    def distinct(self, start, end, data):
        if self.end == start - 1 and self.data == data:
            self.ends[-1] = self.end = end
            return False
        return True

    def fill(self, start):
        if self.start < start:
            before = start - 1
            self.ends.insert(-1, before)
            self.children.append(Interval(self.start, before, self.data))

    def merge_worker(self, start, end, data):
        self.fill(start)
        add = self.start < start
        if not add:
            add = not self.children or self.children[-1].distinct(start, end, data)
        if add:
            self.children.append(Interval(start, end, data))
            self.ends.insert(-1, end)
        self.start = end + 1

    def merge(self, start, end, data):
        if not (self.variate(start, data) and self.distinct(start, end, data)):
            return
        if self.start <= start:
            self.merge_worker(start, end, data)
        else:
            self.children[bisect_left(self.ends, start)].merge(start, end, data)

def discretize(seq):
    Interval.instances.clear()
    current = -1e309
    for start, end, data in seq:
        if start > current + 1:
            top = Interval(start, end, data)
            current = end
        else:
            top.merge(start, end, data)
    return sorted(((i.start, i.end, i.data) for i in Interval.instances))

而且效率更高。

In [305]: %timeit discretize([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
11 µs ± 85.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)

我用下面的代码来测试它:
check.py (Google drive)
英语不是我的母语,我不擅长单词,所以我不会描述我的方法,代码不难理解。
我的问题是,如何动态计算的插入索引,以便直接按顺序插入范围,从而消除对结果排序的需要?结果必须是有序的,排序真的很昂贵。
我可以在创建每个示例时给它附加一个索引,但到目前为止我只知道如何使用连续整数作为索引,只是在创建一个示例时增加一个,我不知道如何动态计算索引以使示例保持排序。
我还没有实现我想要的一切,所以我不能在Code Review上发布这篇文章。我如何计算所需的指数?

wydwbb8l

wydwbb8l1#

这类问题通常用sweep line algorithm来解决。
想象一下,沿着x轴从-无穷大到无穷大扫一条垂直线。当它遇到任何间隔的开始或结束时,“当前数据”可能会改变。所有这些时刻都是“事件”,通过跟踪每个事件时当前数据的变化,可以轻松计算输出间隔。
对于您的问题,我们可以简单地对间隔进行排序以获得开始事件的序列。在每个开始事件中,我们可以将开始的间隔添加到“当前间隔”的集合中。下一个结束事件则是当前间隔中最先结束的间隔之后的数字。我们可以通过在优先级队列中保留当前间隔来跟踪这一点。
最后,请注意,如果两个间隔重叠,则最先结束的间隔具有优先权,在平局的情况下,最后开始的间隔获胜。如果我们按照(end,-start)对优先级队列进行排序,那么第一个当前间隔将为我们提供当前数据和下一个结束事件。
把所有这些放在一起,下面是一个Python实现:

import heapq

def descretize(intervals):
    if len(intervals) < 1:
        return []
    intervals = sorted(intervals)
    currentData = None
    out = []

    # read position in interval list
    inpos = 0

    # currently open output interval
    currentStart = None
    currentData = None

    # end events [(end_pos+1, -start_pos, data)]
    endq = []

    while inpos < len(intervals):

        # get the next event position
        evpos = intervals[inpos][0]
        if len(endq) > 0 and endq[0][0] < evpos:
            evpos = endq[0][0]
        
        # remove intervals that are no longer current
        while len(endq) > 0 and endq[0][0] <= evpos:
            heapq.heappop(endq)
        
        # add new current intervals
        while inpos < len(intervals) and intervals[inpos][0] <= evpos:
            s, e, data = intervals[inpos]
            inpos += 1
            heapq.heappush(endq,(e+1,-s,data))
        
        # get next current data
        nextData = endq[0][2] if len(endq) > 0 else None
        if nextData != currentData:
            # data changes here
            if currentData != None:
                out.append((currentStart, evpos-1, currentData))
            currentStart = evpos
            currentData = nextData

    # process remaining end events
    while len(endq) > 0:
        evpos = heapq.heappop(endq)[0]

        # remove intervals that are no longer current
        while len(endq) > 0 and endq[0][0] <= evpos:
            heapq.heappop(endq)

        # get next current data
        nextData = endq[0][2] if len(endq) > 0 else None
        if nextData != currentData:
            # data changes here
            if currentData != None:
                out.append((currentStart, evpos-1, currentData))
            currentStart = evpos
            currentData = nextData
    
    return out

相关问题