Azure数据工厂、批处理服务、Python:无法使用csv区块追加blob

tsm1rwdh  于 12个月前  发布在  Python
关注(0)|答案(1)|浏览(125)

Azure Data Facory pipeline将Batch Service作为链接服务运行自定义Activity,其中包含Python代码。该转换是为本地运行而构建的,我想让它在Azure上运行,将blob(csv文件)保存到Azure存储(Az Data Lake)中。
转换在块上运行,并在for循环中执行。

for i, in_chunk in enumerate(pd.read_csv(source, chunksize=6000, sep=";")
    # transformation happens and spits out out_chunk parameter 
    # holding the 6000 rows out of the entire file
    kwargs = {'mode':'a', 'header': False} if i>0 else {}
    out_chunk.to_csv(cdl_file, sep="|", index=False, **kwargs)

字符串
在此之后,我尝试了不同的方法,因为它是写在这个问题和答案例如:我原来的问题,另一个问题
在上面的问题和答案中编写的解决方案没有抛出错误,但它没有存储整个文件,只存储指定为块的6000行。
我做错了什么?我不明白这应该如何处理。
编辑:根据JayashankarGS的要求,我添加了我尝试的代码和发生的事情的截图。

def transform_data(v, outputType, eventType, fileName, config, source, conn_string, dl, encoding, in_dtypes = None,  chunkSize=10000, in_sep = ";"):
folderName = 'temp'
containerName = 'input'
outputBlobName = folderName + "/" + fileName
inputBlobPath = containerName + "/" + outputBlobName
blob = BlobClient.from_connection_string(conn_str=conn_string, container_name=containerName, blob_name=outputBlobName)

credential = {'connection_string': conn_string}
accountName = conn_string.split("AccountName=")[1].split(";")[0] 

adls_path = 'abfs://' + containerName + '@' + accountName + '.dfs.core.windows.net/' + outputBlobName

template = pd.DataFrame(columns = v.headers[outputType])
transformationSchema = config[outputType + "_out_transformations"]

logging.info('Reading data chunk...')
for i, in_chunk in enumerate(pd.read_csv(source,
            chunksize = chunkSize,
            sep=in_sep,
            encoding = 'unicode_escape',
            dtype=in_dtypes)):
    logging.info('Handle duplicates and missing fields...')

    in_chunk.fillna('', inplace=True) 
    out_chunk = template.copy()

    out_chunk.fillna('', inplace=True)

    out_chunk.drop_duplicates(subset=[config["composite_key"]["key_partA"], config["composite_key"]["key_partB"], config["composite_key"]["key_partC"]],inplace=True)

    logging.info('Start data transformation for schema: ' + outputType)

    for name, spec in transformationSchema.items():
        out_chunk[name] = transform_column(in_chunk, spec)

    kwargs = {'mode': 'a', 'header': False}

    logging.info('Transformation was successful for ' + outputType)
    dateTime = time.strftime("%Y%m%d%H%M%S")

    if not os.path.exists(containerName + "/" + folderName + "/"):
            os.mkdir(containerName)
            os.mkdir(containerName + "/" + folderName + "/")

    print(f"Uploading chunk: {len(out_chunk)}")
    logging.info('Trying to store transformed file in Azure Storage...')

    out_chunk.to_csv(adls_path, storage_options=credential, sep=dl, index=False, **kwargs)


这样做的结果是生成并存储在Azure存储中的两个文件。正如您在Azure Data Factory上使用Batch Service运行此操作的结果中所看到的,它将按照给定的批处理大小处理10000行,然后尝试对第二个文件执行相同的操作。“未找到文件”错误来自转换后的一个验证程序步骤(忽略该警告!)。
x1c 0d1x的数据

shyt4zoc

shyt4zoc1#

原因是有不同的blob类型。
在您的代码中:

adls_path = 'abfs://[email protected]/outfromVS.csv'
credentials = {'connection_string': blob_conn}

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500, sep=";")):
    kwargs = {'mode': 'a', 'header': False} if i > 0 else {}
    print(in_chunk.count())
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)

字符串
对于第一次迭代,您没有传递mode;默认情况下,这会使文件类型为Block blob
x1c 0d1x的数据
写入存储时,BlobType应该相同。
因此,只需从一开始就给予模式:kwargs = {'mode': 'a', 'header': False}
下面是我的数据统计:



现在上传一个大小为500的块:

for i, in_chunk in enumerate(pd.read_csv('./hts-sample-test.csv', chunksize=500)):
    kwargs = {'mode': 'a', 'header': False}
    print(f"Uploading chunk: {len(in_chunk)}")
    in_chunk.to_csv(adls_path, storage_options=credentials, **kwargs)


输出:



再次,获取文件:

备注:第一次运行输出文件时,请确保存储帐户中不存在输出文件,如果存在,则应为Append blob类型。

相关问题