python-3.x 如何有效地合并重叠范围?

0h4hbjxa  于 2023-08-08  发布在  Python
关注(0)|答案(3)|浏览(106)

我有来自多个csv文件的数据,我需要将表合并到一个表中。所讨论的数据是GeoLite 2数据库的文本转储,实际上有数百万行,简单地将数据加载到列表中需要2927 MiB。
这些表包含关于IP网络的信息,一些表包含关于ASN的信息,一些关于城市的信息,还有一些关于国家的信息,这些表具有不同的键(IP网络),并且它们可能包含公共键,我打算将这些表合并到一个表中,其中包含关于所列所有网络的ASN、国家和城市的信息。
这个问题和我以前的question有关,但它是不同的。
想象一个无限的盒子排列成一行,它们使用唯一的整数编号,并且所有的盒子最初都是空的。这一次,所有的盒子都可以容纳无限多个值,但它们只能容纳唯一的值。也就是说,如果你把A放进盒子0,盒子0就包含了A,但是在那之后,无论你把A放进盒子0多少次,盒子0总是恰好包含A的1个示例。但是如果你把B放进盒子0,盒子0就包含了A和B。但是如果你再把B放进盒子里,盒子0仍然包含A的1个示例和B的1个示例。
现在有很多三元组,前两个元素是整数,它们对应一个整数范围的开始和结束(含),每个三元组描述了一个连续的整数范围的盒子(意味着每个盒子的数目是前一个盒子的数目加1)与同一对象。
例如,(0, 10, 'A')意味着框0到10包含'A'的示例。
任务是合并来自三元组的信息,并以最少数量的三元组描述盒子的状态,在这种情况下,第三个元素是set s。
输入(0, 10, 'A')->输出(0, 10, {'A'}),说明:框0到10包含'A'的示例。
输入(0, 10, 'A'), (11, 20, 'A')->输出(0, 20, {'A'}),说明:框0到10包含'A'的示例,框11到20也包含'A'的示例,11是10 + 1,所以框0到20包含'A'的示例。
输入(0, 10, 'A'), (20, 30, 'A')->输出(0, 10, {'A'}), (20, 30, {'A'}),说明:框0到10包含一个'A'的示例,框20到30也包含一个'A'的示例,所有其他框都是空的,并且20与10不相邻,不要合并。
输入(0, 10, 'A'), (11, 20, 'B')->输出(0, 10, {'A'}), (11, 20, {'B'})
输入(0, 10, 'A'), (2, 8, 'B')->输出(0, 1, {'A'}), (2, 8, {'A', 'B'}), (9, 10, {'A'}),说明:框0到10具有'A',而框2到8具有'B',因此框2到8具有{'A', 'B'}
输入(0, 10, 'A'), (5, 20, 'B')->输出(0, 4, {'A'}), (5, 10, {'A', 'B'}), (11, 20, {'B'})说明:同上。
输入(0, 10, 'A'), (5, 10, 'A')->输出(0, 10, {'A'}),说明:框0到10具有'A',第二个三元组没有添加新信息,并且是垃圾,丢弃它。
我当前的代码为一些测试用例生成正确的输出,但为其他测试用例生成KeyError

import random
from collections import defaultdict
from typing import Any, List, Tuple

def get_nodes(ranges: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]:
    nodes = []
    for ini, fin, data in ranges:
        nodes.extend([(ini, False, data), (fin, True, data)])
    return sorted(nodes)

def combine_gen(ranges):
    nodes = get_nodes(ranges)
    stack = set()
    actions = []
    for node, end, data in nodes:
        if not end:
            if (action := (data not in stack)):
                if stack and start < node:
                    yield start, node - 1, stack.copy()
                stack.add(data)
                start = node
            actions.append(action)
        elif actions.pop(-1):
            if start <= node:
                yield start, node, stack.copy()
                start = node + 1
            stack.remove(data)

def merge(segments):
    start, end, data = next(segments)
    for start2, end2, data2 in segments:
        if end + 1 == start2 and data == data2:
            end = end2
        else:
            yield start, end, data
            start, end, data = start2, end2, data2
    yield start, end, data

def combine(ranges):
    return list(merge(combine_gen(ranges)))

字符串
它为以下测试用例生成正确的输出:

sample1 = [(0, 20, 'A'), (10, 40, 'B'), (32, 50, 'C'), (40, 50, 'D'), (45, 50, 'E'), (70, 80, 'F'), (90, 100, 'G'), (95, 120, 'H'), (131, 140, 'I'), (140, 150, 'J')]
sample2 = [(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D'), (110, 150, 'E'), (250, 300, 'C'), (256, 270, 'D'), (295, 300, 'E'), (500, 600, 'F')]
sample3 = [(0, 100, 'A'), (10, 25, 'B'), (15, 25, 'C'), (20, 25, 'D'), (30, 50, 'E'), (40, 50, 'F'), (60, 80, 'G'), (150, 180, 'H')]
sample4 = [(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]


我不会在这里包括它们的预期输出,运行我的代码,你会发现输出是什么,输出是正确的。
我写了一个函数来制作测试用例和一个保证正确但低效的解决方案,我的高效代码在馈送机器生成输入时会引发KeyError

def make_generic_case(num, lim, dat):
    ranges = []

    for _ in range(num):
        start = random.randrange(lim)
        end = random.randrange(lim)
        if start > end:
            start, end = end, start
        ranges.append([start, end, random.randrange(dat)])

    ranges.sort(key=lambda x: (x[0], -x[1]))
    return ranges

def bruteforce_combine(ranges):
    boxes = defaultdict(set)
    for start, end, data in ranges:
        for n in range(start, end + 1):
            boxes[n].add(data)
    
    boxes = sorted(boxes.items())
    output = []
    lo, cur = boxes.pop(0)
    hi = lo

    for n, data in boxes:
        if cur == data and n - hi == 1:
            hi = n
        else:
            output.append((lo, hi, cur))
            lo = hi = n
            cur = data

    output.append((lo, hi, cur))
    return output


因为我的代码 * 不能正常工作,所以我不能在Code Review* 上发布它,因为Code Review只审查工作的代码,而我的不是。

  • 使用make_generic_case(512, 4096, 16)获得测试用例并验证所建议解决方案是否正确,bruteforce_combine的输出需要答案 *,bruteforce_combine根据定义是正确的(我的逻辑是defaultdict(set))。

合并重叠范围的更有效方法是什么?
现有的两个答案都不理想,第一个给出了正确的结果,但效率非常低,永远不会处理我的数百万行:

In [5]: for _ in range(256):
   ...:     case = make_generic_case(512, 4096, 16)
   ...:     assert bruteforce_combine(case) == combine(case)

In [6]: case = make_generic_case(512, 4096, 16)

In [7]: %timeit combine(case)
9.3 ms ± 35 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


第二种方法更有效,但我还没有彻底测试过。
我已经从第二个答案中确认了代码的正确性,并将其重写为以下内容:

from collections import Counter

def get_nodes(ranges):
    nodes = []
    for start, end, label in ranges:
        nodes.extend(((start, 0, label), (end + 1, 1, label)))

    return sorted(nodes)

def combine(ranges):
    if not ranges:
        return []
    nodes = get_nodes(ranges)
    labels = set()
    state = Counter()
    result = []
    start = nodes[0][0]
    for node, is_end, label in nodes:
        state[label] += [1, -1][is_end]
        count = state[label]
        if (is_end, count) in {(0, 1), (1, 0)}:
            if start < node:
                if not count or labels:
                    result.append((start, node - 1, labels.copy()))

                start = node

            (labels.remove, labels.add)[count](label)

    return result


它仍然非常低效,我需要处理数百万行:

In [2]: for _ in range(128):
   ...:     case = make_generic_case(256, 4096, 16)
   ...:     assert bruteforce_combine(case) == combine(case)

In [3]: for _ in range(2048):
   ...:     case = make_generic_case(512, 2048, 16)
   ...:     assert bruteforce_combine(case) == combine(case)

In [4]: case = make_generic_case(2048, 2**64, 32)

In [5]: %timeit combine(case)
4.19 ms ± 112 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [6]: case = make_generic_case(32768, 2**64, 32)

In [7]: %timeit combine(case)
116 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [8]: case = make_generic_case(1048576, 2**64, 32)

In [9]: %timeit combine(case)
5.12 s ± 30.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


我有来自6个巨大CSV文件的数据,总行数是:

In [74]: 495209+129884+3748518+1277097+429639+278661
Out[74]: 6359008


这是远远超过600万,仅将数据加载到RAM需要2.9GiB,而我只有16 GiB RAM。我需要一个在时间复杂度和空间复杂度上都更有效的解决方案。

uurity8g

uurity8g1#

这里有一个简单的策略。
您将从(start, end, label)形式的元组列表开始。
将每个元组转换为2:(start, 0, label), (end+1, 1, label)的值。
排序元组。请注意,这将在下一个框的开始之后在上一个框上放置一个结束,以便我们可以合并范围。
通过他们,并产生答案。请注意,我们总是发出在我们当前处理的框之前结束的范围。
对于一个大型数据集,我实际上会将中间表单放入一个文件中,在Python之外对其进行排序,然后从那里加载。但这是一个实施细节。下面是一些示例代码。

def add_sample (sample, merged=None):
    if merged is None:
        merged = []
    for (start, end, label) in sample:
        merged.append((start, 0, label))
        merged.append((end+1, 1, label))
    return merged

def extract_result (merged):
    merged = sorted(merged)
    state = {}
    if 0 == len(merged):
        return []
    prev_labels = set()
    labels = set()
    state = {}
    answer = []
    start_time = merged[0][0]
    for (time, is_end, label) in merged:
        if is_end == 0:
            new_count = state.get(label, 0) + 1
            if new_count == 1:
                if start_time < time:
                    if 0 < len(labels):
                        answer.append((start_time, time-1, list(sorted(labels))))
                    start_time = time
                labels.add(label)
            state[label] = new_count
        else:
            new_count = state.get(label, 0) - 1
            if new_count == 0:
                if start_time < time:
                    prev_labels = labels.copy()
                    answer.append((start_time, time-1, list(sorted(labels))))
                    start_time = time
                labels.remove(label)
            state[label] = new_count

    return answer

sample1 = [(0, 20, 'A'), (10, 40, 'B'), (32, 50, 'C'), (40, 50, 'D'), (45, 50, 'E'), (70, 80, 'F'), (90, 100, 'G'), (95, 120, 'H'), (131, 140, 'I'), (140, 150, 'J')]
sample2 = [(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D'), (110, 150, 'E'), (250, 300, 'C'), (256, 270, 'D'), (295, 300, 'E'), (500, 600, 'F')]
sample3 = [(0, 100, 'A'), (10, 25, 'B'), (15, 25, 'C'), (20, 25, 'D'), (30, 50, 'E'), (40, 50, 'F'), (60, 80, 'G'), (150, 180, 'H')]
sample4 = [(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]

merged = add_sample(sample1)
merged = add_sample(sample2, merged)
merged = add_sample(sample3, merged)
merged = add_sample(sample4, merged)
print(extract_result(merged))

字符串

ufj5ltwl

ufj5ltwl2#

基于@InSync的提示,我创建了一个解决方案,它将关键点收集在字典中并对其进行处理:

import random
from collections import defaultdict, Counter

# Original code

def make_generic_case(num, lim, dat):
    ranges = []

    for _ in range(num):
        start = random.randrange(lim)
        end = random.randrange(lim)
        if start > end:
            start, end = end, start
        ranges.append([start, end, random.randrange(dat)])

    ranges.sort(key=lambda x: (x[0], -x[1]))
    return ranges

def bruteforce_combine(ranges):
    boxes = defaultdict(set)
    for start, end, data in ranges:
        for n in range(start, end + 1):
            boxes[n].add(data)
    
    boxes = sorted(boxes.items())
    output = []
    lo, cur = boxes.pop(0)
    hi = lo

    for n, data in boxes:
        if cur == data and n - hi == 1:
            hi = n
        else:
            output.append((lo, hi, cur))
            lo = hi = n
            cur = data

    output.append((lo, hi, cur))
    return output

# My code

def crit_points(ranges):
    result = defaultdict(list)
    
    for r in ranges:
        result[r[1] + 1].append((False, r[2]))

    for r in ranges:
        result[r[0]].append((True, r[2]))

    return sorted(result.items())

def combine(ranges):
    if len(ranges) == 0:
        return []

    active_count = Counter()
    active_idx = 0

    result = []

    for idx, changes in crit_points(ranges):

        setcopy = set(+active_count)

        for s, v in changes:

            if s:
                active_count[v] += 1
            else:
                active_count[v] -= 1

        active_count = +active_count

        if idx > active_idx and setcopy != set(active_count):

            if setcopy:
                result.append((active_idx, idx - 1, setcopy))
            
            active_idx = idx

            

    if idx > active_idx:
            result.append((active_idx, idx - 1, setcopy))

    return result


# Simple test and benchmark

from time import perf_counter

case = make_generic_case(512, 4096, 16)

t = perf_counter()
solution1 = bruteforce_combine(case)
print(f"Brute force: {perf_counter() - t}")

t = perf_counter()
solution2 = combine(case)
print(f"Mine: {perf_counter() - t}")

print(f"Valid: {solution1 == solution2}")

字符串

nhhxz33t

nhhxz33t3#

您可以重用基因组算术包,如BEDOPS。您正在尝试做的是在BEDOPS中解决低内存开销的问题,该问题大约在2012年发布。
你可以在“伪染色体”或假的集合名称上考虑你的间隔。这不是什么大不了的事,因为把所有的数据放在一个假的“染色体”上相当于把所有的间隔放在一个集合中,以便于集合操作。
您可能会使用BEDOPS bedopsbedmap工具的组合,即比如:

$ bedops --merge setA.txt setB.txt ... setN.txt > merge.txt
$ bedops --everything setA.txt setB.bxt ... setN.txt > union.txt
$ bedmap --echo --echo-map --delim '\t' merge.txt union.txt > answer.txt

字符串
试图在Python中重新发明轮子,当有11年的老软件已经解决了这个问题时,可能不是很好地利用你的时间。考虑查看基因组间隔集操作工具包。

相关问题