我有一个每分钟都在监听的事件,它返回一个列表;它可以为空、1个元素或更多。对于列表中这些元素,我希望运行一个函数,该函数将在10分钟内每分钟监视该元素上的一个事件。
为此我写了那个剧本
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import asyncio
import Client
client = Client()
def handle_event(event):
for i in range(10):
client.get_info(event)
sleep(60)
async def main():
while True:
entires = client.get_new_entry()
if len(entires) > 0:
with ThreadPoolExecutor(max_workers=len(entires)) as executor:
executor.map(handle_event, entires)
await asyncio.sleep(60)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
但是,它不是一直监视条目,而是在仍监视以前的条目时阻塞。
你知道我该怎么做吗?
1条答案
按热度按时间ryhaxcpt1#
首先让我解释一下为什么你的程序不能按照你想要的方式工作:这是因为我们使用ThreadPoolExecutor作为上下文管理器,它在调用map所启动的所有线程都完成之前不会关闭,所以main()会在那里等待,直到所有工作都完成,循环的下一次迭代才会发生。
有很多方法可以解决这个问题。因为你已经在使用asyncio,一种方法是将Executor的创建转移到一个单独的任务中。主循环的每次迭代都会启动这个任务的一个副本,这个副本会一直运行到任务完成。这是一个async def函数,所以这个任务的许多副本可以并发运行。
我在代码中做了一些改动,只是使用了一些简单的print语句,而不是Client语句,我将一个随机长度的整数列表传递给handle_event,每次while_True语句结束时,计数器都会递增:循环,并将计数器的10倍加到列表中的每个整数上。这使得很容易看到旧调用是如何继续一段时间,与新调用混合在一起的。我还缩短了您的时间延迟。所有这些更改都是为了方便,并不重要。
重要的变化是将ThreadPoolExecutor创建移到一个任务中,为了使它与其他任务协作,它必须包含一个等待表达式,因此我使用
executor.submit
而不是executor.map
。submit
返回concurrent.futures. Future,这提供了一种方便的方式来等待所有调用的完成。另一方面,executor.map
返回一个迭代器;我想不出什么好办法把它变成一个可等待的对象。要将concurrent.futures.Future转换为asyncio.Future(一个可等待的),有一个函数asyncio.wrap_future。当所有的futures都完成时,我退出ThreadPoolExecutor上下文管理器。这将非常快,因为Executor的所有工作都完成了,所以它不会阻塞其他任务。