python 使用Azure持久函数进行聚合

lo8azlld  于 2023-04-28  发布在  Python
关注(0)|答案(1)|浏览(106)

我被卡住了。..我需要一个Azure函数,它能够接收一定数量的数据包,然后在接收到所有数据后对这些数据执行一些操作。该函数是通过HTTP请求调用的,最初该请求包含一些参数,其中包括要接收的数据包的数量。我有一个客户端函数,它接收数据并启动orchestrator。..但无论我做什么,我的orchestrator都会失败,并出现TypeError:coroutine类型的对象在发送任何数据之前都不是JSON可序列化的错误。要发送的数据是一系列整数,所需的输出是接收到的整数的计数(所以没什么复杂的)
期望函数在第一次调用时返回示例id,然后函数将被调用n次以接收每个整数,一旦完成,它将输出计数。
有人能给我指出正确的方向吗?我可能在这个过程中错过了一个片段,但我似乎无法弄清楚为什么配器失败了。这是我的orchestrator函数:

async def orchestrator_function(context: df.DurableOrchestrationContext):

input_data = context.get_input()
n_count = int(input_data['n_count'])
# there are some other parameters, but they are not relevant

my_data = []

for i in range(n_count):
    result = await context.wait_for_external_event("get_data")
    my_data.append(result)

total_received = len(df)

return total_received
5gfr0r5j

5gfr0r5j1#

我实际上能够弄清楚这一点,所以对于其他希望使用持久函数聚合数据的人来说,这里是一个简化的例子。在此示例中,该函数由Power Automate流使用HTTP POST调用。数据包的预期数量作为查询(n_count)传递。对初始函数调用的响应产生sendEventPostUri,然后通过HTTP POST将数据发送到该Uri。一旦收到所有数据,函数返回总数(通过HTTP GET)。
客户端如下:

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    n_count = int(req.params.get('n_count')) 
    instance_id = await client.start_new('my_aggregator', None, {'n_count': n_count })

    return client.create_check_status_response(req, instance_id)

协调器如下所示:

def orchestrator_function(context: df.DurableOrchestrationContext):
    try: 
        input_data = context.get_input()
        n_count = int(input_data['n_count'])
        my_data = []

        for i in range(n_count):
            result = yield context.wait_for_external_event("get_data")
            my_data.append(result)
           
        total = sum(my_data)

        return total

    except Exception as e:
        context.set_custom_status("Failed")
        raise

相关问题