#!/usr/bin/env python3
import multiprocessing
import boto3
import sys
# make a per process s3_client
s3_client = None
def initialize():
global s3_client
s3_client = boto3.client('s3')
# the work function of each process which will fetch something from s3
def download(job):
bucket, key, filename = job
s3_client.download_file(bucket, key, filename)
if __name__ == '__main__':
# make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
bucket = sys.argv[1]
jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]
# make a process pool to do the work
pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
pool.map(download, jobs)
pool.close()
pool.join()
import boto3
import multiprocessing as mp
import os
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
def s3download(object_key_file):
my_bucket.download_file(object_key_file[0], object_key_file[1])
print('downloaded file with object name... {}'.format(object_key_file[0]))
print('downloaded file with file name... {}'.format(object_key_file[1]))
def parallel_s3_download():
object_key_file=[]
for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
# Need to split s3_object.key into path and file name, else it will give error file not found.
path, filename = os.path.split(s3_object.key)
object_key_file.append((s3_object.key,filename))
object_key_file.pop(0)
pool = mp.Pool(min(mp.cpu_count(), len(object_key_file))) # number of workers
pool.map(s3download, object_key_file, chunksize=1)
pool.close()
if __name__ == "__main__":
parallel_s3_download()
print('downloading zip file')
import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse
import boto3
from jsonargparse import CLI
def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
"""Batch an iterator. The last item might be of smaller len than batch_size.
Args:
iterable (Iterable): Any iterable that should be batched
batch_size (int): Len of the generated lists
Yields:
Generator[List, None, None]: List of items in iterable
"""
batch = []
counter = 0
for i in iterable:
batch.append(i)
counter += 1
if counter % batch_size == 0:
yield batch
batch = []
if len(batch) > 0:
yield batch
def download_batch(batch):
s3 = boto3.client("s3")
n = 0
for line in batch:
dst, line = line
url = urlparse(line)
url_path = url.path.lstrip("/")
folder, basename = os.path.split(url_path)
dir = os.path.join(dst, folder)
os.makedirs(dir, exist_ok=True)
filepath = os.path.join(dir, basename)
print(f"{filepath}")
s3.download_file(url.netloc, url_path, filepath)
n += 1
return n
def file_reader(fp, dst):
with open(fp) as f:
for line in f:
line = line.rstrip("\n")
yield dst, line
def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
"""Copy files from s3 based on a list of urls. The output folder structure follows
the s3 path.
Args:
txt_path (str): path to your list of files. One url per line.
dst (str): path to store the files.
n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
the computer.
"""
total_files = sum([1 for _ in file_reader(txt_path, dst)])
print(n_cpus)
n_cpus = min(total_files, n_cpus)
batch_size = total_files // n_cpus
with Pool(processes=n_cpus) as pool:
for n in pool.imap_unordered(
download_batch, batcher(file_reader(txt_path, dst), batch_size)
):
pass
if __name__ == "__main__":
CLI(copy_cli)
3条答案
按热度按时间fslejnso1#
如果你想用
boto3
把很多小文件并行地直接下载到磁盘上,你可以用multiprocessing
模块来实现,这里有一个小代码段可以做到这一点,你可以这样运行它:./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n
其中一个重要的部分是我们为每个进程创建一个s3客户端的示例,每个进程都将重用它。这有两个重要原因。首先,创建客户端很慢,所以我们希望尽可能少地这样做。其次,客户端不应该在进程之间共享,因为调用
download_file
可能会改变客户端的内部状态。6yt4nkrj2#
下面的代码片段允许您使用多处理从s3下载多个对象
35g0bw713#
面对
boto3.Client
未知的线程安全状态,下面是在python〉=3.7中使用多处理的一种方法用法
my_list.txt
一个二个一个一个
我希望它能有所帮助😊。你可以在这个repo https://github.com/fcossio/s3-selective-copy中找到相同的代码