从Azure容器下载Blob的最佳做法

r1zhe5dt  于 2023-03-09  发布在  其他
关注(0)|答案(1)|浏览(161)

我有一个Azure容器,其中包含数千个blob,每个blob都保存在id/year/month/day/hour_minute_second/file.json目录中。我想下载python中start_dateend_date之间的id的所有file.json。为此,我使用azure python包中的BlobServiceClient。在下载每个JSON文件之前,我将使用get_blob_client(blob=blob_dir).exists()方法检查blob目录是否存在。

from azure.storage.blob import BlobServiceClient
import pandas as pd

class AzureContainerClient(object):
    def __init__(self, account_url="https://mystorage.blob.core.windows.net/",
                 container_name='json',
                 credential="azure"):

        self.account_url = account_url
        self.container_name = container_name
        self.credential = credential

        # Connect to the Azure
        self.__connect()

    def __connect(self):
        """
        Connect to Azure container_name.
        :return:
        """

        self.blob_client_server = BlobServiceClient(account_url=self.account_url, credential=self.credential)
        self.container_client = self.blob_client_server.get_container_client(container=self.container_name)

    def close(self) -> None:
        """
        Close the connection.
        :return: None
        """

        self.blob_client_server.close()

    def is_exist(self, blob: str) -> bool:
        """
        Return True if blob exist in self.container_name else False
        :param blob: blob address
        :return:
        """

        return self.container_client.get_blob_client(blob=blob).exists()

    def read_blob(self, blob) -> dict:
        """
        Read the blob from container_client.
        :param blob: blob directory
        :return:
        """

        data = self.container_client.get_blob_client(blob=blob).download_blob().readall()

        # Load the binary data into json
        # data = json.loads(data)

        return data

def get_files(ids: list, start_date: str, end_date: str) -> pd.DataFrame:
    """
    Get the json files for  ids from start_date to end_date.
    :param ids:
    :param start_date:
    :param end_date:
    :return:
    """

    date_range = pd.date_range(start=start_date, end=end_date, freq='H')

    # Get the generator directories for each id between start_date and end_date
    stores_dates_gen = product(ids, date_range)

    azure_container_client = AzureContainerClient()

    data_list = []
    for id_date in stores_dates_gen:
        # Get the blob directory
        id_date_blob = f'{id_date[0]}/{"/".join(id_date[1].strftime("%Y-%m-%d-%H_%M_%S").split("-"))}/file.json'

        # Check the existence of the id_date blob in container
        if not azure_container_client.is_exist(id_date_blob):
            continue

        data = azure_container_client.read_blob(blob=id_date_blob)

        data_list.append((id_date[0], id_date[1], data))

    df = pd.DataFrame(data=data_list, columns=['id', 'dateTime', 'data'])

    return df

但是,这太花时间了,主要是因为检查blob是否存在(azure_container_client.is_exist(id_date_blob)),下载所有现有blob的最快解决方案是什么?

8iwquhpp

8iwquhpp1#

不用检查每个blob是否存在,你可以尝试直接读取blob,基本上你可以去掉下面几行代码:

if not azure_container_client.is_exist(id_date_blob):
            continue

如果blob不存在,则读取操作将引发错误。
你需要做的是捕捉这个错误并检查错误的状态码,如果状态码是404,那就意味着你试图读取的blob不存在,你就继续读列表中的下一个blob。
这将大大减少时间,因为您已经将网络操作的数量减少了一半。

相关问题