我有来自多个csv文件的数据,我需要将表合并到一个表中。所讨论的数据是GeoLite 2数据库的文本转储,实际上有数百万行,简单地将数据加载到列表中需要2927 MiB。
这些表包含关于IP网络的信息,一些表包含关于ASN的信息,一些关于城市的信息,还有一些关于国家的信息,这些表具有不同的键(IP网络),并且它们可能包含公共键,我打算将这些表合并到一个表中,其中包含关于所列所有网络的ASN、国家和城市的信息。
这个问题和我以前的question有关,但它是不同的。
想象一个无限的盒子排列成一行,它们使用唯一的整数编号,并且所有的盒子最初都是空的。这一次,所有的盒子都可以容纳无限多个值,但它们只能容纳唯一的值。也就是说,如果你把A放进盒子0,盒子0就包含了A,但是在那之后,无论你把A放进盒子0多少次,盒子0总是恰好包含A的1个示例。但是如果你把B放进盒子0,盒子0就包含了A和B。但是如果你再把B放进盒子里,盒子0仍然包含A的1个示例和B的1个示例。
现在有很多三元组,前两个元素是整数,它们对应一个整数范围的开始和结束(含),每个三元组描述了一个连续的整数范围的盒子(意味着每个盒子的数目是前一个盒子的数目加1)与同一对象。
例如,(0, 10, 'A')
意味着框0到10包含'A'
的示例。
任务是合并来自三元组的信息,并以最少数量的三元组描述盒子的状态,在这种情况下,第三个元素是set
s。
输入(0, 10, 'A')
->输出(0, 10, {'A'})
,说明:框0到10包含'A'
的示例。
输入(0, 10, 'A'), (11, 20, 'A')
->输出(0, 20, {'A'})
,说明:框0到10包含'A'
的示例,框11到20也包含'A'
的示例,11是10 + 1,所以框0到20包含'A'
的示例。
输入(0, 10, 'A'), (20, 30, 'A')
->输出(0, 10, {'A'}), (20, 30, {'A'})
,说明:框0到10包含一个'A'
的示例,框20到30也包含一个'A'
的示例,所有其他框都是空的,并且20与10不相邻,不要合并。
输入(0, 10, 'A'), (11, 20, 'B')
->输出(0, 10, {'A'}), (11, 20, {'B'})
输入(0, 10, 'A'), (2, 8, 'B')
->输出(0, 1, {'A'}), (2, 8, {'A', 'B'}), (9, 10, {'A'})
,说明:框0到10具有'A'
,而框2到8具有'B'
,因此框2到8具有{'A', 'B'}
。
输入(0, 10, 'A'), (5, 20, 'B')
->输出(0, 4, {'A'}), (5, 10, {'A', 'B'}), (11, 20, {'B'})
说明:同上。
输入(0, 10, 'A'), (5, 10, 'A')
->输出(0, 10, {'A'})
,说明:框0到10具有'A'
,第二个三元组没有添加新信息,并且是垃圾,丢弃它。
我当前的代码为一些测试用例生成正确的输出,但为其他测试用例生成KeyError
:
import random
from collections import defaultdict
from typing import Any, List, Tuple
def get_nodes(ranges: List[Tuple[int, int, Any]]) -> List[Tuple[int, int, Any]]:
nodes = []
for ini, fin, data in ranges:
nodes.extend([(ini, False, data), (fin, True, data)])
return sorted(nodes)
def combine_gen(ranges):
nodes = get_nodes(ranges)
stack = set()
actions = []
for node, end, data in nodes:
if not end:
if (action := (data not in stack)):
if stack and start < node:
yield start, node - 1, stack.copy()
stack.add(data)
start = node
actions.append(action)
elif actions.pop(-1):
if start <= node:
yield start, node, stack.copy()
start = node + 1
stack.remove(data)
def merge(segments):
start, end, data = next(segments)
for start2, end2, data2 in segments:
if end + 1 == start2 and data == data2:
end = end2
else:
yield start, end, data
start, end, data = start2, end2, data2
yield start, end, data
def combine(ranges):
return list(merge(combine_gen(ranges)))
字符串
它为以下测试用例生成正确的输出:
sample1 = [(0, 20, 'A'), (10, 40, 'B'), (32, 50, 'C'), (40, 50, 'D'), (45, 50, 'E'), (70, 80, 'F'), (90, 100, 'G'), (95, 120, 'H'), (131, 140, 'I'), (140, 150, 'J')]
sample2 = [(0, 10, 'A'), (0, 1, 'B'), (2, 5, 'C'), (3, 4, 'C'), (6, 7, 'C'), (8, 8, 'D'), (110, 150, 'E'), (250, 300, 'C'), (256, 270, 'D'), (295, 300, 'E'), (500, 600, 'F')]
sample3 = [(0, 100, 'A'), (10, 25, 'B'), (15, 25, 'C'), (20, 25, 'D'), (30, 50, 'E'), (40, 50, 'F'), (60, 80, 'G'), (150, 180, 'H')]
sample4 = [(0, 16, 'red'), (0, 4, 'green'), (2, 9, 'blue'), (2, 7, 'cyan'), (4, 9, 'purple'), (6, 8, 'magenta'), (9, 14, 'yellow'), (11, 13, 'orange'), (18, 21, 'green'), (22, 25, 'green')]
型
我不会在这里包括它们的预期输出,运行我的代码,你会发现输出是什么,输出是正确的。
我写了一个函数来制作测试用例和一个保证正确但低效的解决方案,我的高效代码在馈送机器生成输入时会引发KeyError
。
def make_generic_case(num, lim, dat):
ranges = []
for _ in range(num):
start = random.randrange(lim)
end = random.randrange(lim)
if start > end:
start, end = end, start
ranges.append([start, end, random.randrange(dat)])
ranges.sort(key=lambda x: (x[0], -x[1]))
return ranges
def bruteforce_combine(ranges):
boxes = defaultdict(set)
for start, end, data in ranges:
for n in range(start, end + 1):
boxes[n].add(data)
boxes = sorted(boxes.items())
output = []
lo, cur = boxes.pop(0)
hi = lo
for n, data in boxes:
if cur == data and n - hi == 1:
hi = n
else:
output.append((lo, hi, cur))
lo = hi = n
cur = data
output.append((lo, hi, cur))
return output
型
因为我的代码 * 不能正常工作,所以我不能在Code Review* 上发布它,因为Code Review只审查工作的代码,而我的不是。
- 使用
make_generic_case(512, 4096, 16)
获得测试用例并验证所建议解决方案是否正确,bruteforce_combine
的输出需要答案 *,bruteforce_combine
根据定义是正确的(我的逻辑是defaultdict(set)
)。
合并重叠范围的更有效方法是什么?
现有的两个答案都不理想,第一个给出了正确的结果,但效率非常低,永远不会处理我的数百万行:
In [5]: for _ in range(256):
...: case = make_generic_case(512, 4096, 16)
...: assert bruteforce_combine(case) == combine(case)
In [6]: case = make_generic_case(512, 4096, 16)
In [7]: %timeit combine(case)
9.3 ms ± 35 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
型
第二种方法更有效,但我还没有彻底测试过。
我已经从第二个答案中确认了代码的正确性,并将其重写为以下内容:
from collections import Counter
def get_nodes(ranges):
nodes = []
for start, end, label in ranges:
nodes.extend(((start, 0, label), (end + 1, 1, label)))
return sorted(nodes)
def combine(ranges):
if not ranges:
return []
nodes = get_nodes(ranges)
labels = set()
state = Counter()
result = []
start = nodes[0][0]
for node, is_end, label in nodes:
state[label] += [1, -1][is_end]
count = state[label]
if (is_end, count) in {(0, 1), (1, 0)}:
if start < node:
if not count or labels:
result.append((start, node - 1, labels.copy()))
start = node
(labels.remove, labels.add)[count](label)
return result
型
它仍然非常低效,我需要处理数百万行:
In [2]: for _ in range(128):
...: case = make_generic_case(256, 4096, 16)
...: assert bruteforce_combine(case) == combine(case)
In [3]: for _ in range(2048):
...: case = make_generic_case(512, 2048, 16)
...: assert bruteforce_combine(case) == combine(case)
In [4]: case = make_generic_case(2048, 2**64, 32)
In [5]: %timeit combine(case)
4.19 ms ± 112 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [6]: case = make_generic_case(32768, 2**64, 32)
In [7]: %timeit combine(case)
116 ms ± 1.11 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [8]: case = make_generic_case(1048576, 2**64, 32)
In [9]: %timeit combine(case)
5.12 s ± 30.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
型
我有来自6个巨大CSV文件的数据,总行数是:
In [74]: 495209+129884+3748518+1277097+429639+278661
Out[74]: 6359008
型
这是远远超过600万,仅将数据加载到RAM需要2.9GiB,而我只有16 GiB RAM。我需要一个在时间复杂度和空间复杂度上都更有效的解决方案。
3条答案
按热度按时间uurity8g1#
这里有一个简单的策略。
您将从
(start, end, label)
形式的元组列表开始。将每个元组转换为2:
(start, 0, label), (end+1, 1, label)
的值。排序元组。请注意,这将在下一个框的开始之后在上一个框上放置一个结束,以便我们可以合并范围。
通过他们,并产生答案。请注意,我们总是发出在我们当前处理的框之前结束的范围。
对于一个大型数据集,我实际上会将中间表单放入一个文件中,在Python之外对其进行排序,然后从那里加载。但这是一个实施细节。下面是一些示例代码。
字符串
ufj5ltwl2#
基于@InSync的提示,我创建了一个解决方案,它将关键点收集在字典中并对其进行处理:
字符串
nhhxz33t3#
您可以重用基因组算术包,如BEDOPS。您正在尝试做的是在BEDOPS中解决低内存开销的问题,该问题大约在2012年发布。
你可以在“伪染色体”或假的集合名称上考虑你的间隔。这不是什么大不了的事,因为把所有的数据放在一个假的“染色体”上相当于把所有的间隔放在一个集合中,以便于集合操作。
您可能会使用BEDOPS
bedops
和bedmap
工具的组合,即比如:字符串
试图在Python中重新发明轮子,当有11年的老软件已经解决了这个问题时,可能不是很好地利用你的时间。考虑查看基因组间隔集操作工具包。