postgresql 使用read()方法从Amazon S3阅读大型JSON文件时出现MemoryError

dly7yett  于 2022-12-03  发布在  PostgreSQL
关注(0)|答案(1)|浏览(136)

我尝试使用Python从Amazon S3导入一个大尺寸的JSON文件到AWS RDS-PostgreSQL。但是,出现了这些错误,
回溯(最近的呼叫排在最后):
文件“my_code.py”,第67行,位于
文件内容= obj ['正文'].read().decode('utf-8').splitlines(真)
文件“/home/user/asd到qwe/fgh-to-hjk/env/local/lib/python3.6/站点包/botocore/response.py“,第76行,在读取
数据块=自身.原始数据流.读取(数量)
文件“/home/user/asd到qwe/fgh-to-hjk/env/local/lib/python3.6/站点包/botocore/vendored/requests/packages/urllib 3/response.py“,第239行,在读取
读取数据
文件“/usr/lib 64/python3.6/http/client.py“,第462行,在读取
s =自身安全读取(自身长度)
文件“/usr/lib 64/python3.6/http/client.py“,第617行,在_safe_read中
返回B”".join
内存错误
//我的代码.py

import sys
import boto3
import psycopg2
import zipfile
import io
import json

s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()

bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)

def insert_query(data):
    query = """
        INSERT INTO data_table
        SELECT
            (src.test->>'url')::varchar, (src.test->>'id')::bigint,
            (src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
        FROM (SELECT CAST(%s AS JSONB) AS test) src
    """
    cursor.execute(query, (json.dumps(data),))

if key.endswith('.zip'):
    zip_files = obj['Body'].read()
    with io.BytesIO(zip_files) as zf:
        zf.seek(0)
        with zipfile.ZipFile(zf, mode='r') as z:
            for filename in z.namelist():
                with z.open(filename) as f:
                    for line in f:
                        insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
    file_content = obj['Body'].read().decode('utf-8').splitlines(True)
    for line in file_content:
        insert_query(json.loads(line))

connection.commit()
connection.close()

这些问题有什么解决办法吗?任何帮助都可以,非常感谢!

f0brbegy

f0brbegy1#

通过避免将整个输入文件作为list行写入内存,可以节省大量的内存。
具体来说,这些行在内存使用方面非常糟糕,因为它们涉及到一个bytes对象的峰值内存使用,该对象的大小相当于整个文件的大小,再加上list行,其中包含文件的完整内容:

file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:

在64位Python 3.3+上,对于一个1GB的ASCII文本文件,包含500万行代码,对于bytes对象、listlist中的单个str,峰值内存需求大约为2.3GB,一个需要2.3倍于它所处理的文件大小的内存的程序将无法扩展到大文件。
要修复此问题,请将原始代码更改为:

file_content = io.TextIOWrapper(obj['Body'], encoding='utf-8')
for line in file_content:

假设obj['Body']看起来可以用于延迟流,这应该会从内存中删除完整文件数据的 * 两个 * 副本。使用TextIOWrapper意味着obj['Body']被延迟读取并以块的形式解码(一次几KB),并且行也被延迟迭代;这将存储器需求减少到小的、基本固定的量(峰值存储器成本将取决于最长行的长度),而不管文件大小。

更新日期:

看起来StreamingBody并没有实现io.BufferedIOBase ABC。但是它确实有its own documented API,可以用于类似的目的。如果你不能让TextIOWrapper为你做工作(如果它能工作的话,效率和简单得多),另一种选择是:

file_content = (line.decode('utf-8') for line in obj['Body'].iter_lines())
for line in file_content:

与使用TextIOWrapper不同,它不能从块的批量解码中获益(每行单独解码),但在减少内存使用方面,它仍然可以实现相同的优势。

相关问题