curl 使用多个并行线程分部分下载大型文件

pu82cl6c  于 2022-11-13  发布在  其他
关注(0)|答案(4)|浏览(412)

我有一个使用案例,需要使用多个线程来下载一个大的远程文件。每个线程必须同时运行(并行),抓取文件的特定部分。期望在成功下载所有部分后,将这些部分合并成一个(原始)文件。
也许使用requests库可以完成这项工作,但我不确定如何将其多线程化到一个将块组合在一起的解决方案中。

url = 'https://url.com/file.iso'
headers = {"Range": "bytes=0-1000000"}  # first megabyte
r = get(url, headers=headers)

我也在考虑使用curl,由Python来组织下载,但我不确定这是不是正确的方法。它看起来太复杂了,偏离了普通的Python解决方案。类似于:

curl --range 200000000-399999999 -o file.iso.part2

有没有人能解释一下你是如何做这样的事情的?或者发布一个在Python 3中工作的代码示例?我通常很容易找到与Python相关的答案,但是这个问题的解决方案似乎让我很困惑。

mfuanj7w

mfuanj7w1#

这是一个使用Python 3和Asyncio的版本,这只是一个例子,它可以改进,但你应该能够得到你需要的一切。

  • get_size:发送HEAD请求以获取文件的大小
  • download_range:下载单个块
  • download:下载所有块并合并它们
import asyncio
import concurrent.futures
import functools
import requests
import os

# WARNING:
# Here I'm pointing to a publicly available sample video.
# If you are planning on running this code, make sure the
# video is still available as it might change location or get deleted.
# If necessary, replace it with a URL you know is working.
URL = 'https://download.samplelib.com/mp4/sample-30s.mp4'
OUTPUT = 'video.mp4'

async def get_size(url):
    response = requests.head(url)
    size = int(response.headers['Content-Length'])
    return size

def download_range(url, start, end, output):
    headers = {'Range': f'bytes={start}-{end}'}
    response = requests.get(url, headers=headers)

    with open(output, 'wb') as f:
        for part in response.iter_content(1024):
            f.write(part)

async def download(run, loop, url, output, chunk_size=1000000):
    file_size = await get_size(url)
    chunks = range(0, file_size, chunk_size)

    tasks = [
        run(
            download_range,
            url,
            start,
            start + chunk_size - 1,
            f'{output}.part{i}',
        )
        for i, start in enumerate(chunks)
    ]

    await asyncio.wait(tasks)

    with open(output, 'wb') as o:
        for i in range(len(chunks)):
            chunk_path = f'{output}.part{i}'

            with open(chunk_path, 'rb') as s:
                o.write(s.read())

            os.remove(chunk_path)

if __name__ == '__main__':
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
    loop = asyncio.new_event_loop()
    run = functools.partial(loop.run_in_executor, executor)

    asyncio.set_event_loop(loop)

    try:
        loop.run_until_complete(
            download(run, loop, URL, OUTPUT)
        )
    finally:
        loop.close()
vngu2lb8

vngu2lb82#

您可以使用grequests进行并行下载。

import grequests

URL = 'https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/debian-10.1.0-amd64-netinst.iso'
CHUNK_SIZE = 104857600  # 100 MB
HEADERS = []

_start, _stop = 0, 0
for x in range(4):  # file size is > 300MB, so we download in 4 parts. 
    _start = _stop
    _stop = 104857600 * (x + 1)
    HEADERS.append({"Range": "bytes=%s-%s" % (_start, _stop)})

rs = (grequests.get(URL, headers=h) for h in HEADERS)
downloads = grequests.map(rs)

with open('/tmp/debian-10.1.0-amd64-netinst.iso', 'ab') as f:
    for download in downloads:
        print(download.status_code)
        f.write(download.content)

PS:我没有检查范围是否正确确定,以及下载的md5 sum是否匹配!这应该只是显示了它的一般工作原理。

au9on6nz

au9on6nz3#

我发现最好的方法是使用一个名为pySmartDL的模块。
步骤1:pip install pySmartDL
步骤2:下载文件时,您可以使用

from pySmartDL import SmartDL
obj = SmartDL(url, destination)
obj.start()

注:默认情况下,这会提供一个下载计数器。
如果您需要将下载进度挂钩到一个gui,您可以使用

obj = SmartDL(url, dest,progress_bar=False)
obj.start(blocking=False)
while not obj.isFinished():
    download_precentage = round(obj.get_progress()*100,2)
    time.sleep(0.2)
    print(download_precentage)

如果您想使用更多线程,可以使用

obj = SmartDL(url, destination,threads=7) #by default thread = 5
obj.start()

您可以在项目页面中找到更多功能
下载次数:http://pypi.python.org/pypi/pySmartDL/
文件:http://itaybb.github.io/pySmartDL/
项目页面:https://github.com/iTaybb/pySmartDL/
错误和问题:https://github.com/iTaybb/pySmartDL/issues

zengzsys

zengzsys4#

您也可以从concurrent.futures使用ThreadPoolExecutor(或ProcessPoolExecutor),而不使用asyncio。以下显示如何使用ThreadPoolExecutor修改bug's answer

**额外的好处:**以下代码片段 * 还 * 使用tqdm来显示下载进度条。如果你不想使用tqdm,只需注解掉with tqdm(total=file_size . . .下面的代码块。关于tqdm的更多信息是here,它可以与pip install tqdm一起安装。顺便说一句,tqdm也可以与asyncio一起使用。

import requests
import concurrent.futures
from concurrent.futures import as_completed
from tqdm import tqdm
import os

def download_part(url_and_headers_and_partfile):
    url, headers, partfile = url_and_headers_and_partfile
    response = requests.get(url, headers=headers)
    # setting same as below in the main block, but not necessary:
    chunk_size = 1024*1024 

    # Need size to make tqdm work.
    size=0 
    with open(partfile, 'wb') as f:
        for chunk in response.iter_content(chunk_size):
            if chunk:
                size+=f.write(chunk)
    return size

def make_headers(start, chunk_size):
    end = start + chunk_size - 1
    return {'Range': f'bytes={start}-{end}'}

url = 'https://download.samplelib.com/mp4/sample-30s.mp4'
file_name = 'video.mp4'
response = requests.get(url, stream=True)
file_size = int(response.headers.get('content-length', 0))
chunk_size = 1024*1024

chunks = range(0, file_size, chunk_size)
my_iter = [[url, make_headers(chunk, chunk_size), f'{file_name}.part{i}'] for i, chunk in enumerate(chunks)] 

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    jobs = [executor.submit(download_part, i) for i in my_iter]

    with tqdm(total=file_size, unit='iB', unit_scale=True, unit_divisor=chunk_size, leave=True, colour='cyan') as bar:
        for job in as_completed(jobs):
            size = job.result()
            bar.update(size)

with open(file_name, 'wb') as outfile:
    for i in range(len(chunks)):
        chunk_path = f'{file_name}.part{i}'
        with open(chunk_path, 'rb') as s:
            outfile.write(s.read())
        os.remove(chunk_path)

相关问题