如何使用持久函数将CSV文件添加到Blob服务

hs1ihplo  于 2023-11-14  发布在  其他
关注(0)|答案(1)|浏览(141)

在Azure持久函数中,我想在Blob服务中额外写入CSV。
我想做的
1.选择要由Blob服务写入的CSV文件。
1.创建数据框
1.将数据框添加到在1中选择的CSV文件。
我使用python(编程模型v2),下面是简化的代码。

import  azure.functions  as  func
import  azure.durable_functions  as  df
import  logging  
import  numpy as np
import  pandas as pd

app  =  df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)  

### client function ###
@app.route(route="orchestrators/client_function")
@app.durable_client_input(client_name="client")
async  def  client_function(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    instance_id  =  await  client.start_new("orchestrator", None, {})
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    await  client.wait_for_completion_or_create_check_status_response(req, instance_id)
    status  =  await  client.get_status(instance_id)
    return  f"output: {status.output}"  

### orchestrator function ###
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext) -> dict:
    result = yield context.call_activity("main", '')
    return "Inserted"

### activity function ###
@app.blob_input(arg_name="inputblob", path="newblob/test.test", connection="BlobStorageConnection")
@app.blob_output(arg_name="outputblob", path="newblob/test.csv", connection="BlobStorageConnection")
@app.activity_trigger(input_name="blank")
def main(blank: str, inputblob: str, outputblob: func.Out[str]):
    data = np.random.rand(100) 
    df = pd.DataFrame(data)
    '''
    I don't know how to add to an already existing CSV file.
    outputblob.set(csv_data) ?
    '''
    return "Inserted"

字符串

oxcyiej7

oxcyiej71#

在Azure持久函数中,我想在Blob服务中额外写入CSV。
我尝试了下面的代码,并收到了下面的结果:

import azure.functions as func
import azure.durable_functions as df
import os
from azure.storage.blob import BlobServiceClient
 
myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)
 
def blob_service_client():
    conn_str= os.getenv('connectionstring')
    return BlobServiceClient.from_connection_string(conn_str)
 
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
    function_name = req.route_params.get('functionName')
    instance_id = await client.start_new(function_name)
    response = client.create_check_status_response(req, instance_id)
    return response
 
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
    result= yield context.call_activity("hello","")
    return result
    
@myApp.activity_trigger(input_name="city")
def hello(city: str):
    update= blob_service_client().get_blob_client(container="cont16391", blob="SampleDataFile.csv")
    blob_data = update.download_blob().readall() + "newcontent".encode()
    update.upload_blob(blob_data, overwrite= True)
    return "inserted"

字符串

输出:


的数据



相关问题