在Python中并行下载多个S3对象

oxiaedzo  于 2023-03-11  发布在  Python
关注(0)|答案(3)|浏览(149)

有没有办法在Python3中使用boto3并发下载S3文件?我知道aiobotocore库,但我想知道是否有办法使用标准boto3库。

fslejnso

fslejnso1#

如果你想用boto3把很多小文件并行地直接下载到磁盘上,你可以用multiprocessing模块来实现,这里有一个小代码段可以做到这一点,你可以这样运行它:./download.py bucket_name s3_key_0 s3_key_1 ... s3_key_n

#!/usr/bin/env python3
import multiprocessing
import boto3
import sys

# make a per process s3_client
s3_client = None
def initialize():
  global s3_client
  s3_client = boto3.client('s3')

# the work function of each process which will fetch something from s3
def download(job):
  bucket, key, filename = job
  s3_client.download_file(bucket, key, filename)

if __name__ == '__main__':
  # make the jobs, arguments to program are: bucket s3_key_0 s3_key_1 ... s3_key_n
  bucket = sys.argv[1]
  jobs = [(bucket, key, key.replace('/', '_')) for key in sys.argv[2:] ]

  # make a process pool to do the work
  pool = multiprocessing.Pool(multiprocessing.cpu_count(), initialize)
  pool.map(download, jobs)
  pool.close()
  pool.join()

其中一个重要的部分是我们为每个进程创建一个s3客户端的示例,每个进程都将重用它。这有两个重要原因。首先,创建客户端很慢,所以我们希望尽可能少地这样做。其次,客户端不应该在进程之间共享,因为调用download_file可能会改变客户端的内部状态。

6yt4nkrj

6yt4nkrj2#

下面的代码片段允许您使用多处理从s3下载多个对象

import boto3
import multiprocessing as mp
import os

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('My_bucket')
        
def s3download(object_key_file):
    my_bucket.download_file(object_key_file[0], object_key_file[1])
    print('downloaded file with object name... {}'.format(object_key_file[0]))
    print('downloaded file with file name... {}'.format(object_key_file[1]))
        
def parallel_s3_download():
    object_key_file=[]
    for s3_object in my_bucket.objects.filter(Prefix="directory_name/"):
        # Need to split s3_object.key into path and file name, else it will give error file not found.
        path, filename = os.path.split(s3_object.key)
        object_key_file.append((s3_object.key,filename))
    object_key_file.pop(0)
    pool = mp.Pool(min(mp.cpu_count(), len(object_key_file)))  # number of workers
    pool.map(s3download, object_key_file, chunksize=1)
    pool.close()
if __name__ == "__main__":
    parallel_s3_download()
    print('downloading zip file')
35g0bw71

35g0bw713#

面对boto3.Client未知的线程安全状态,下面是在python〉=3.7中使用多处理的一种方法

import os
from multiprocessing import Pool
from typing import Generator, Iterable, List
from urllib.parse import urlparse

import boto3
from jsonargparse import CLI

def batcher(iterable: Iterable, batch_size: int) -> Generator[List, None, None]:
    """Batch an iterator. The last item might be of smaller len than batch_size.

    Args:
        iterable (Iterable): Any iterable that should be batched
        batch_size (int): Len of the generated lists

    Yields:
        Generator[List, None, None]: List of items in iterable
    """
    batch = []
    counter = 0
    for i in iterable:
        batch.append(i)
        counter += 1
        if counter % batch_size == 0:
            yield batch
            batch = []
    if len(batch) > 0:
        yield batch

def download_batch(batch):
    s3 = boto3.client("s3")
    n = 0
    for line in batch:
        dst, line = line
        url = urlparse(line)
        url_path = url.path.lstrip("/")
        folder, basename = os.path.split(url_path)
        dir = os.path.join(dst, folder)
        os.makedirs(dir, exist_ok=True)
        filepath = os.path.join(dir, basename)
        print(f"{filepath}")
        s3.download_file(url.netloc, url_path, filepath)
        n += 1
    return n

def file_reader(fp, dst):
    with open(fp) as f:
        for line in f:
            line = line.rstrip("\n")
            yield dst, line

def copy_cli(txt_path: str, dst: str = os.getcwd(), n_cpus: int = os.cpu_count()):
    """Copy files from s3 based on a list of urls. The output folder structure follows
    the s3 path.

    Args:
        txt_path (str): path to your list of files. One url per line.
        dst (str): path to store the files.
        n_cpus (int): number of simultaneous batches. Defaults to the number of cpus in
         the computer.
    """
    total_files = sum([1 for _ in file_reader(txt_path, dst)])
    print(n_cpus)
    n_cpus = min(total_files, n_cpus)
    batch_size = total_files // n_cpus
    with Pool(processes=n_cpus) as pool:
        for n in pool.imap_unordered(
            download_batch, batcher(file_reader(txt_path, dst), batch_size)
        ):
            pass

if __name__ == "__main__":
    CLI(copy_cli)

用法

pip install jsonargparse boto3

my_list.txt
一个二个一个一个
我希望它能有所帮助😊。你可以在这个repo https://github.com/fcossio/s3-selective-copy中找到相同的代码

相关问题