我写了一个脚本来处理重叠的间隔,我以前做过几次尝试,我不会在这里详细介绍,你可以看到this来获得更多的上下文。这个版本是我想出的最聪明的,它是最高效的,经过严格的测试我已经确定它的输出是完全正确的。
这个问题可以简化为,给定大量的三元组(a, b, c)
,在每个三元组中a
和b
是整数,b >= a
,每个三元组表示一个范围range(a, b + 1)
,c
是与该范围相关的数据。(a, b, c)
相当于{range(a, b + 1): c}
。
在范围中有很多重叠,并且相邻的范围共享数据。
假设有两个这样的范围,分别称为(s0, e0, d0)
和(s1, e1, d1)
,可能会发生以下3种不良情况之一:
s0 <= s1 <= e1 < e0
,则第二个范围完全包含在第一个范围内,并且d0 == d1
- 第二个范围是第一个范围的子范围,如上所述,但是
d0 != d1
e0 == s1 - 1 and d0 == d1
以下情况永远不会发生:
e0 == s1
和s0 <= s1 and e0 <= e1
这些范围要么是完全离散的,要么一个是另一个的完整子集,没有部分交叉。
现在我想处理范围,使其没有重叠,并且所有相邻的范围共享不同的数据,这需要对上述3种情况中的每一种进行以下操作:
- 忽略子范围,不做任何事情,因为它们共享相同的数据
- 拆分父范围,删除父范围的重叠部分,并用子范围的数据覆盖重叠范围。
- 忽略第二个范围,并将
e0
设置为e1
,以便合并两个范围。
结果不应包含重叠范围和额外范围,每个范围只有一个基准面,并且始终从较新的较小范围继承。
输入的排序方式可以在Python中通过使用lambda x: (x[0], -x[1])
作为键来获得。如果输入没有排序,我会在处理它之前以这种方式排序。
这是一个小例子:
数据:
[(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D')]
数据含义:
[
{0: 'A', 1: 'A', 2: 'A', 3: 'A', 4: 'A', 5: 'A', 6: 'A', 7: 'A', 8: 'A', 9: 'A', 10: 'A'},
{0: 'B', 1: 'B'},
{2: 'C', 3: 'C', 4: 'C', 5: 'C'},
{3: 'C', 4: 'C'},
{6: 'C', 7: 'C'},
{8: 'D'}
]
我的预期过程相当于简单地按顺序处理它们,并根据当前条目逐个更新字典。
处理数据:
{0: 'B', 1: 'B', 2: 'C', 3: 'C', 4: 'C', 5: 'C', 6: 'C', 7: 'C', 8: 'D', 9: 'A', 10: 'A'}
输出:
[(0, 1, 'B'), (2, 7, 'C'), (8, 8, 'D'), (9, 10, 'A')]
我需要尽可能高效地完成这项工作,因为我实际上有数百万个条目要处理,这并不夸张。我试着使用intervaltree
,但它对我的目的来说太低效了:
In [3]: %timeit merge_rows([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
499 µs ± 14.5 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
以下是我的代码
from bisect import bisect_left
class Interval:
instances = []
def __init__(self, start, end, data):
self.start = start
self.end = end
self.data = data
self.ends = [end]
self.children = []
Interval.instances.append(self)
def variate(self, start, data):
return not self.start <= start <= self.end or data != self.data
def distinct(self, start, end, data):
if self.end == start - 1 and self.data == data:
self.ends[-1] = self.end = end
return False
return True
def fill(self, start):
if self.start < start:
before = start - 1
self.ends.insert(-1, before)
self.children.append(Interval(self.start, before, self.data))
def merge_worker(self, start, end, data):
self.fill(start)
add = self.start < start
if not add:
add = not self.children or self.children[-1].distinct(start, end, data)
if add:
self.children.append(Interval(start, end, data))
self.ends.insert(-1, end)
self.start = end + 1
def merge(self, start, end, data):
if not (self.variate(start, data) and self.distinct(start, end, data)):
return
if self.start <= start:
self.merge_worker(start, end, data)
else:
self.children[bisect_left(self.ends, start)].merge(start, end, data)
def discretize(seq):
Interval.instances.clear()
current = -1e309
for start, end, data in seq:
if start > current + 1:
top = Interval(start, end, data)
current = end
else:
top.merge(start, end, data)
return sorted(((i.start, i.end, i.data) for i in Interval.instances))
而且效率更高。
In [305]: %timeit discretize([[0, 10, 'A'], [0, 1, 'B'], [2, 5, 'C'], [3, 4, 'C'], [6, 7, 'C'],[8, 8, 'D']])
11 µs ± 85.4 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
我用下面的代码来测试它:
check.py (Google drive)
英语不是我的母语,我不擅长单词,所以我不会描述我的方法,代码不难理解。
我的问题是,如何动态计算的插入索引,以便直接按顺序插入范围,从而消除对结果排序的需要?结果必须是有序的,排序真的很昂贵。
我可以在创建每个示例时给它附加一个索引,但到目前为止我只知道如何使用连续整数作为索引,只是在创建一个示例时增加一个,我不知道如何动态计算索引以使示例保持排序。
我还没有实现我想要的一切,所以我不能在Code Review上发布这篇文章。我如何计算所需的指数?
1条答案
按热度按时间wydwbb8l1#
这类问题通常用sweep line algorithm来解决。
想象一下,沿着x轴从-无穷大到无穷大扫一条垂直线。当它遇到任何间隔的开始或结束时,“当前数据”可能会改变。所有这些时刻都是“事件”,通过跟踪每个事件时当前数据的变化,可以轻松计算输出间隔。
对于您的问题,我们可以简单地对间隔进行排序以获得开始事件的序列。在每个开始事件中,我们可以将开始的间隔添加到“当前间隔”的集合中。下一个结束事件则是当前间隔中最先结束的间隔之后的数字。我们可以通过在优先级队列中保留当前间隔来跟踪这一点。
最后,请注意,如果两个间隔重叠,则最先结束的间隔具有优先权,在平局的情况下,最后开始的间隔获胜。如果我们按照(end,-start)对优先级队列进行排序,那么第一个当前间隔将为我们提供当前数据和下一个结束事件。
把所有这些放在一起,下面是一个Python实现: