并行下载在python线程中不起作用

dced5bon  于 2023-02-18  发布在  Python
关注(0)|答案(1)|浏览(145)

我正在使用threading模块构建一个并行下载库。
当我使用媒体库时,它会正确下载文件,但视频文件的内容与我通过浏览器下载的内容不同
我使用threading进行并行下载,我认为threading.Lockfile.seek有问题,但我不知道如何解决这个问题。
这是我的代码:

import requests
import threading
from tqdm import tqdm

DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB

class DownloadPart:
    def __init__(self, url, byte_range) -> None:
        self.url = url
        self.byte_range = byte_range

        self.lock = threading.Lock()

    def download(self, file, pbar=None):
        response = requests.get(
            self.url,
            headers={"Range": "bytes={}-{}".format(*self.byte_range)},
            allow_redirects=True,
            stream=True,
        )

        written = 0

        for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
            if chunk:
                self.lock.acquire()

                file.seek(self.byte_range[0] + written)
                length = file.write(chunk)
                file.flush()

                written += length
                pbar.update(length)

                self.lock.release()

class Downloader:
    def __init__(self, url, parts=10):
        self.url = url
        self.parts = parts

    def _get_file_size(self) -> int:
        info = requests.head(self.url, allow_redirects=True)

        info.raise_for_status()

        size = info.headers.get("content-length", None)
        assert size
        return int(size)

    def download(self, filename):
        file_size = self._get_file_size()
        # file_size = 1024

        size_per_part = file_size // self.parts

        print(file_size, size_per_part)

        file = open(filename, "wb")

        pbar = tqdm(total=file_size)

        threads = []
        for index in range(self.parts):
            # fix last part have more bytes
            if index + 1 == self.parts:
                byte_range = (size_per_part * index, file_size - 1)
            else:
                byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)

            thread = threading.Thread(
                target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
            )
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        file.close()

URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"

d = Downloader(URL)

d.download("video.mp4")

如何解决我的库的问题并在文件中获得相同的数据?谢谢您的帮助。

qjp7pelc

qjp7pelc1#

我的代码有两个问题:
1.我在这里找到了第一个问题的解决方案。https://stackoverflow.com/a/25165183/14900791
Lock()函数创建了一个全新的锁--只有调用该函数的线程才能使用,这就是它不起作用的原因,因为每个线程都锁定了一个完全不同的锁。

  1. Mixdrop(mxdcontent.net)只允许两个视频在同一个IP,所以代码只适用于两个部分,其他得到状态代码509(我没有检查状态代码,所以我没有得到一个错误).
import requests
import threading
from tqdm import tqdm

DOWNLOAD_CHUNK_SIZE = 1 << 20 # 1 MiB

# global lock instance
lock = threading.Lock()

class DownloadPart:
    def __init__(self, url, byte_range) -> None:
        self.url = url
        self.byte_range = byte_range

    def download(self, file, pbar=None):
        response = requests.get(
            self.url,
            headers={"Range": "bytes={}-{}".format(*self.byte_range)},
            allow_redirects=True,
            stream=True,
        )

        written = 0

        for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
            if chunk:
                lock.acquire()

                file.seek(self.byte_range[0] + written)
                length = file.write(chunk)
                file.flush()

                written += length
                pbar.update(length)

                lock.release()

class Downloader:
    def __init__(self, url, parts=10):
        self.url = url
        self.parts = parts

    def _get_file_size(self) -> int:
        info = requests.head(self.url, allow_redirects=True)

        info.raise_for_status()

        size = info.headers.get("content-length", None)
        assert size
        return int(size)

    def download(self, filename):
        file_size = self._get_file_size()
        # file_size = 1024

        size_per_part = file_size // self.parts

        print(file_size, size_per_part)

        file = open(filename, "wb")

        pbar = tqdm(total=file_size)

        threads = []
        for index in range(self.parts):
            # fix last part have more bytes
            if index + 1 == self.parts:
                byte_range = (size_per_part * index, file_size - 1)
            else:
                byte_range = (size_per_part * index, size_per_part * (index + 1) - 1)

            thread = threading.Thread(
                target=DownloadPart(self.url, byte_range).download, args=(file,), kwargs={"pbar": pbar}
            )
            thread.start()
            threads.append(thread)

        for thread in threads:
            thread.join()

        file.close()

URL = "https://s-delivery38.mxdcontent.net/v/8a5f59673042ed97c402be84ceeb20d9.mp4?s=TfiDzO2oBLrhub_GhToCiQ&e=1676489987&_t=1676476332"

d = Downloader(URL)

d.download("video.mp4")

相关问题