如何将blob触发的启动器函数的输入传递到Azure持久函数的编排器中的Activity函数?

anauzrmj  于 2023-04-22  发布在  其他
关注(0)|答案(2)|浏览(78)

正如标题所说,我有一个blob触发的启动器函数,我希望将触发启动器的blob作为输入发送到activity函数。我可以选择使用.start_new()方法传入输入,但这些输入必须是JSON可序列化的,而blob文件(func.InputStream)不是。
我尝试解码InputStream对象(myblob),它允许我将其作为输入传递,但我失去了一些基本功能,当它是InputStream对象时,我还需要确保我能够将其传递到将在Activity函数中调用的后续认知服务中。请参阅下面的starter函数代码。
启动器功能

函数.json`

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "myblob",
      "type": "blobTrigger",
      "direction": "in",
      "path": "container/{name}",
      "connection": "conn str"
    },
    {
      "name": "$return",
      "type": "blob",
      "path": "container/{name}",
      "connection": "AzureWebJobsStorage",
      "direction": "out"
    },
    {
      "name": "starter",
      "type": "durableClient",
      "direction": "in"
    }
  ]
}

`init.py

async def main(myblob: func.InputStream, starter: str) -> func.InputStream:
    logging.info(f"Python blob trigger function processed blob \n"
                 f"Name: {myblob.name}\n"
                 f"Blob Size: {myblob.length} bytes\n\n")        
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new('Orchestrator', client_input=myblob) ##Client_input must be json serializable
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return myblob

返回:异常:类型错误:class〈class 'azure.functions.blob. InputStream'〉不公开to_json函数

guykilcj

guykilcj1#

活动函数也可以有其他常规的输入和输出绑定。因此,您可以将blob的详细信息作为输入传递给活动函数,并使用这些详细信息使用blob输入绑定来获取blob。

3pmvbmvn

3pmvbmvn2#

使用blobtrigger有更多的问题,所以切换到事件网格触发器。将触发编排的blob的名称传递给活动函数(从事件数据中检索),并使用azure sdk将文件下载到内存中,效果非常好。
远离blob触发器的原因:
1.高延迟,标准的blob触发器实际上不是事件驱动的,而是依赖于轮询
1.出于类似的原因,原木也不能保证
1.在处理大文件时(我们的用例要求上限为2.5gb),blob触发器可能是计算密集型的,有时触发器会由于blob的大小/长度而失败。
以下链接的文档中提供了部分信息:
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-storage-blob-trigger?pivots=programming-language-python&tabs=python-v2%2Cin-process

相关问题