用例:给定一个约2GB的.gz文件,用换行符分隔json,操作每行并将输出写入zip文件(csv)
问题:我使用的环境有~1GB的内存,我没有传统的文件系统访问权限。我可以写入文件的唯一方法是将整个数据流作为单个对象从内存中传递(我不能循环生成器并写入文件)
到目前为止,我的方法是循环遍历.gz文件中的数据,修改数据,然后在内存中压缩数据,并在处理完所有数据后将其写出来。当我使用分块并且不操纵数据时,这是可行的。然而,当我尝试一次做一行时,它似乎无限期地运行,不起作用。
gzip数据示例:
{"ip": "1.1.1.1", "org": "cloudflare"}
{"ip": "2.2.2.2", "org": "chickenNugget"}
注意:这不是真正的json,每行都是有效的json,但这不是一个数组
目标输出:
value,description
1.1.1.1, cloudflare
2.2.2.2, chickenNugget
使用分块在几秒钟内工作的示例:
import gzip
chunksize = 100 * 1024 * 1024
with gzip.open('latest.json.gz', 'rt', encoding='utf8') as f:
while True:
chunk = f.read(chunksize)
if not chunk:
break
compressed += gzip.compress(chunk.encode())
# I am able to use platforms internal file creation
# process to create a zip with "compressed" variable - the issue here is that I cannot
# reliably manipulate the data.
我尝试过但不起作用
import gzip
compressed = 'value,description,expiration,active\n'.encode()
with gzip.open('latest.json.gz', 'rt', encoding='utf8') as f:
for line in f:
obj = json.loads(line)
data = f'{obj.get("ip")}{obj.get("organization")},,True\n'
compressed += gzip.compress(data.encode())
# This code never seems to complete - I gave up after running for 3+ hours
EDIT当我在无约束环境中测试第二个示例时,它也会永远运行。但是,如果我像下面这样修改代码,使其在10 k行之后中断,它将按预期工作
...
count = 0
for line in f:
if count > 10000: break
...
count += 1
有没有更好的方法来解决这个问题?
2条答案
按热度按时间zd287kbt1#
你的
compressed +=
是罪魁祸首。Python每次连接到compressed
时都会复制它。这需要O(n k)的时间,而不是O(n),其中 n 是输出的总大小,k 是被连接的片段的数量。(注意,k 对于100MB的块来说很小,但是对于行来说非常大。)您需要将这些数据写出来或者对它做一些事情,而不是试图将它累积在内存中的一个巨大的字符串中。如果你绝对不能避免在内存中产生一个巨大的字符串,那么你需要为压缩数据分配空间作为一个可变数组,并将其复制到该空间中,而不是连接。为了不需要提前知道结果有多大,一旦你得到一个压缩的结果,它会超过你剩下的空间,* 加倍 * 空间的大小,然后复制到那里。只要你每次都加倍(或者将大小乘以某个因子),复制所花费的时间将是O(n)。
Python中最简单的方法是使用BytesIO,它应该可以为您完成上述任务。只需要将压缩数据写入字符串,就像它是一个文件一样。
正如我在上面的评论中提到的,不要把每一行单独压缩为它自己的gzip流。您将不会压缩数据。您将扩展数据。相反,创建一个
zlib.compressobj
,将行发送给它,并在生成压缩数据时将其取回。h9vpoimq2#
这是不可能的,因为转换后的gzip字节的大小将大于1GB。如果从解压缩的文本中删除花括号、双引号、冒号和键,并重新压缩转换后的文本,则大小将接近原始大小的100%,永远不会接近50%,因为每行中删除的字符不会更改。
无论如何,您可以通过使用
GZipFile
流更有效地使用内存并更快地压缩,如下所示。(这只压缩数据一次,因为Mr。阿德勒说。)如果可以使用多个内核,可以使用两个线程或进程来优化此功能。(一个用于解压缩数据并将JSON行转换为CSV行,另一个用于压缩数据)