python-3.x 如何在Trio中收集任务结果?

dwbf0jvd  于 2023-03-04  发布在  Python
关注(0)|答案(5)|浏览(122)

我写了一个脚本,它使用了一个托儿所和一个ask模块来循环并基于循环变量调用一个API,我得到了响应,但不知道如何像使用asyncio那样返回数据。
我还有一个关于将API限制为每秒5个的问题。

from datetime import datetime
import asks
import time
import trio

asks.init("trio")
s = asks.Session(connections=4)

async def main():
    start_time = time.time()

    api_key = 'API-KEY'
    org_id = 'ORG-ID'
    networkIds = ['id1','id2','idn']

    url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
    headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}

    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers)

    print("Total time:", time.time() - start_time)


async def fetch(url, headers):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)



if __name__ == "__main__":
    trio.run(main)

当我运行nursery.start_soon(fetch...)时,我在fetch中打印数据,但是我如何返回数据呢?我没有看到任何类似于asyncio.gather(*tasks)函数的东西。
此外,我可以将会话数限制为1-4,这有助于低于每秒5个API的限制,但我想知道是否有内置的方法来确保在任何给定的一秒内调用的API不超过5个?

t9eec4r0

t9eec4r01#

返回数据:将网络ID和一个dict传递给fetch任务:

async def main():
    …
    results = {}
    async with trio.open_nursery() as nursery:
        for i in networkIds:
            nursery.start_soon(fetch, url.format(i), headers, results, i)
    ## results are available here

async def fetch(url, headers, results, i):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    print("Finished: ", url, len(response.content), response.status_code)
    results[i] = response

或者,创建一个trio.Queue,将结果put到其中;然后,您的主任务可以从队列中读取结果。
API限制:创建一个trio.Queue(10)并沿着以下方式启动任务:

async def limiter(queue):
    while True:
        await trio.sleep(0.2)
        await queue.put(None)

将该队列作为另一个参数传递给fetch,并在每次API调用之前调用await limit_queue.get()

qvsjd97n

qvsjd97n2#

从技术上讲,trio.Queue在trio 0.9中已被弃用,它已被trio.open_memory_channel取代。
简短示例:

sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, sender, url.format(i), headers)

async for value in receiver:
    # Do your job here
    pass

fetch函数中,应该在某处调用async sender.send(value)

a14dhokn

a14dhokn3#

基于this answers,可以定义以下函数:

async def gather(*tasks):

    async def collect(index, task, results):
        task_func, *task_args = task
        results[index] = await task_func(*task_args)

    results = {}
    async with trio.open_nursery() as nursery:
        for index, task in enumerate(tasks):
            nursery.start_soon(collect, index, task, results)
    return [results[i] for i in range(len(tasks))]

然后,您可以通过简单地修补trio(添加gather函数),以与asyncio完全相同的方式使用trio:

import trio
trio.gather = gather

下面是一个实际的例子:

async def child(x):
    print(f"Child sleeping {x}")
    await trio.sleep(x)
    return 2*x

async def parent():
    tasks = [(child, t) for t in range(3)]
    return await trio.gather(*tasks)

print("results:", trio.run(parent))
wqnecbli

wqnecbli4#

当我运行nursery.start_soon(fetch...)时,我在fetch中打印数据,但是我如何返回数据呢?我没有看到任何类似于asyncio.gather(*tasks)函数的东西。
你问了两个不同的问题,所以我只回答这个。马蒂亚斯已经回答了你的另一个问题。
当你调用start_soon()时,你要求Trio在后台运行任务,然后继续运行。这就是为什么Trio能够同时运行fetch()几次的原因。但是因为Trio一直在运行,所以没有办法像Python函数通常那样“返回”结果。它甚至会返回到哪里呢?
您可以使用队列让fetch()任务将结果发送给另一个任务进行额外处理。
要创建队列:

response_queue = trio.Queue(capacity=len(networkIds))

当你开始你的fetch任务时,把队列作为一个参数传递,并在你完成之后发送一个sentintel到队列:

async with trio.open_nursery() as nursery:
    for i in networkIds:
        nursery.start_soon(fetch, url.format(i), headers, response_queue)
await response_queue.put(None)

下载URL后,将响应放入队列:

async def fetch(url, headers, response_queue):
    print("Start: ", url)
    response = await s.get(url, headers=headers)
    # Add responses to queue
    await response_queue.put(response)
    print("Finished: ", url, len(response.content), response.status_code)

通过以上更改,获取任务将把响应放入队列中。现在您需要从队列中读取响应,以便处理它们。您可以添加一个新函数来完成此操作:

async def process(response_queue):
    async for response in response_queue:
        if response is None:
            break
        # Do whatever processing you want here.

在启动任何获取任务之前,您应该将此处理函数作为后台任务启动,以便它在收到响应后立即处理响应。
请参阅Trio文档的“任务之间的同步和通信”部分了解更多信息。

j2qf4p5b

j2qf4p5b5#

正如@Adrien Clerc的回答所说:trio.Queue已弃用:https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40
有关Trio中的任务通信,请参见:https://trio.readthedocs.io/en/latest/reference-core.html#using-channels-to-pass-values-between-tasks
下面是使用open_memory_channel的用例的一个完整的最小工作示例(删除asyncrlget请求并替换为sleep)

import datetime
import trio

async def main():
    network_ids = ["id1", "id2", "idn"]
    url = "https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600"
    send_channel, receive_channel = trio.open_memory_channel(len(network_ids))
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel, url, network_ids)
        nursery.start_soon(consumer, receive_channel)

async def producer(send_channel, url, network_ids):
    async with send_channel:
        async with trio.open_nursery() as nursery:
            for i in network_ids:
                nursery.start_soon(fetch, send_channel, url.format(i))

async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            # Do your job here
            print(f"value received: {value} at time {datetime.datetime.utcnow()}")

async def fetch(send_channel, url):
    print(f"Start: {datetime.datetime.utcnow()}")
    await trio.sleep(1)
    response = f"response for {url}"
    await send_channel.send(response)
    print(f"Finished: {datetime.datetime.utcnow()}")

if __name__ == "__main__":
    trio.run(main)

这将打印:

Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id1/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id2/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/idn/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040

相关问题