我有一个Azure容器,其中包含数千个blob,每个blob都保存在id/year/month/day/hour_minute_second/file.json
目录中。我想下载python中start_date
和end_date
之间的id
的所有file.json
。为此,我使用azure
python包中的BlobServiceClient
。在下载每个JSON文件之前,我将使用get_blob_client(blob=blob_dir).exists()
方法检查blob目录是否存在。
from azure.storage.blob import BlobServiceClient
import pandas as pd
class AzureContainerClient(object):
def __init__(self, account_url="https://mystorage.blob.core.windows.net/",
container_name='json',
credential="azure"):
self.account_url = account_url
self.container_name = container_name
self.credential = credential
# Connect to the Azure
self.__connect()
def __connect(self):
"""
Connect to Azure container_name.
:return:
"""
self.blob_client_server = BlobServiceClient(account_url=self.account_url, credential=self.credential)
self.container_client = self.blob_client_server.get_container_client(container=self.container_name)
def close(self) -> None:
"""
Close the connection.
:return: None
"""
self.blob_client_server.close()
def is_exist(self, blob: str) -> bool:
"""
Return True if blob exist in self.container_name else False
:param blob: blob address
:return:
"""
return self.container_client.get_blob_client(blob=blob).exists()
def read_blob(self, blob) -> dict:
"""
Read the blob from container_client.
:param blob: blob directory
:return:
"""
data = self.container_client.get_blob_client(blob=blob).download_blob().readall()
# Load the binary data into json
# data = json.loads(data)
return data
def get_files(ids: list, start_date: str, end_date: str) -> pd.DataFrame:
"""
Get the json files for ids from start_date to end_date.
:param ids:
:param start_date:
:param end_date:
:return:
"""
date_range = pd.date_range(start=start_date, end=end_date, freq='H')
# Get the generator directories for each id between start_date and end_date
stores_dates_gen = product(ids, date_range)
azure_container_client = AzureContainerClient()
data_list = []
for id_date in stores_dates_gen:
# Get the blob directory
id_date_blob = f'{id_date[0]}/{"/".join(id_date[1].strftime("%Y-%m-%d-%H_%M_%S").split("-"))}/file.json'
# Check the existence of the id_date blob in container
if not azure_container_client.is_exist(id_date_blob):
continue
data = azure_container_client.read_blob(blob=id_date_blob)
data_list.append((id_date[0], id_date[1], data))
df = pd.DataFrame(data=data_list, columns=['id', 'dateTime', 'data'])
return df
但是,这太花时间了,主要是因为检查blob是否存在(azure_container_client.is_exist(id_date_blob)
),下载所有现有blob的最快解决方案是什么?
1条答案
按热度按时间8iwquhpp1#
不用检查每个blob是否存在,你可以尝试直接读取blob,基本上你可以去掉下面几行代码:
如果blob不存在,则读取操作将引发错误。
你需要做的是捕捉这个错误并检查错误的状态码,如果状态码是404,那就意味着你试图读取的blob不存在,你就继续读列表中的下一个blob。
这将大大减少时间,因为您已经将网络操作的数量减少了一半。