我尝试使用Python从Amazon S3导入一个大尺寸的JSON文件到AWS RDS-PostgreSQL。但是,出现了这些错误,
回溯(最近的呼叫排在最后):
文件“my_code.py”,第67行,位于
文件内容= obj ['正文'].read().decode('utf-8').splitlines(真)
文件“/home/user/asd到qwe/fgh-to-hjk/env/local/lib/python3.6/站点包/botocore/response.py“,第76行,在读取
数据块=自身.原始数据流.读取(数量)
文件“/home/user/asd到qwe/fgh-to-hjk/env/local/lib/python3.6/站点包/botocore/vendored/requests/packages/urllib 3/response.py“,第239行,在读取
读取数据
文件“/usr/lib 64/python3.6/http/client.py“,第462行,在读取
s =自身安全读取(自身长度)
文件“/usr/lib 64/python3.6/http/client.py“,第617行,在_safe_read中
返回B”".join
内存错误
//我的代码.py
import sys
import boto3
import psycopg2
import zipfile
import io
import json
s3 = boto3.client('s3', aws_access_key_id=<aws_access_key_id>, aws_secret_access_key=<aws_secret_access_key>)
connection = psycopg2.connect(host=<host>, dbname=<dbname>, user=<user>, password=<password>)
cursor = connection.cursor()
bucket = sys.argv[1]
key = sys.argv[2]
obj = s3.get_object(Bucket=bucket, Key=key)
def insert_query(data):
query = """
INSERT INTO data_table
SELECT
(src.test->>'url')::varchar, (src.test->>'id')::bigint,
(src.test->>'external_id')::bigint, (src.test->>'via')::jsonb
FROM (SELECT CAST(%s AS JSONB) AS test) src
"""
cursor.execute(query, (json.dumps(data),))
if key.endswith('.zip'):
zip_files = obj['Body'].read()
with io.BytesIO(zip_files) as zf:
zf.seek(0)
with zipfile.ZipFile(zf, mode='r') as z:
for filename in z.namelist():
with z.open(filename) as f:
for line in f:
insert_query(json.loads(line.decode('utf-8')))
if key.endswith('.json'):
file_content = obj['Body'].read().decode('utf-8').splitlines(True)
for line in file_content:
insert_query(json.loads(line))
connection.commit()
connection.close()
这些问题有什么解决办法吗?任何帮助都可以,非常感谢!
1条答案
按热度按时间f0brbegy1#
通过避免将整个输入文件作为
list
行写入内存,可以节省大量的内存。具体来说,这些行在内存使用方面非常糟糕,因为它们涉及到一个
bytes
对象的峰值内存使用,该对象的大小相当于整个文件的大小,再加上list
行,其中包含文件的完整内容:在64位Python 3.3+上,对于一个1GB的ASCII文本文件,包含500万行代码,对于
bytes
对象、list
和list
中的单个str
,峰值内存需求大约为2.3GB,一个需要2.3倍于它所处理的文件大小的内存的程序将无法扩展到大文件。要修复此问题,请将原始代码更改为:
假设
obj['Body']
看起来可以用于延迟流,这应该会从内存中删除完整文件数据的 * 两个 * 副本。使用TextIOWrapper
意味着obj['Body']
被延迟读取并以块的形式解码(一次几KB),并且行也被延迟迭代;这将存储器需求减少到小的、基本固定的量(峰值存储器成本将取决于最长行的长度),而不管文件大小。更新日期:
看起来
StreamingBody
并没有实现io.BufferedIOBase
ABC。但是它确实有its own documented API,可以用于类似的目的。如果你不能让TextIOWrapper
为你做工作(如果它能工作的话,效率和简单得多),另一种选择是:与使用
TextIOWrapper
不同,它不能从块的批量解码中获益(每行单独解码),但在减少内存使用方面,它仍然可以实现相同的优势。