python-3.x 保持在变量列表参数上创建线程的最佳方法

ycl3bljg  于 2023-01-27  发布在  Python
关注(0)|答案(1)|浏览(103)

我有一个每分钟都在监听的事件,它返回一个列表;它可以为空、1个元素或更多。对于列表中这些元素,我希望运行一个函数,该函数将在10分钟内每分钟监视该元素上的一个事件。
为此我写了那个剧本

from concurrent.futures import ThreadPoolExecutor
from time import sleep
import asyncio
import Client

client = Client()

def handle_event(event):
    for i in range(10):
        client.get_info(event)
        sleep(60)

async def main():
    while True:
        entires = client.get_new_entry()

        if len(entires) > 0:
            with ThreadPoolExecutor(max_workers=len(entires)) as executor:
                executor.map(handle_event, entires)

        await asyncio.sleep(60)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())

但是,它不是一直监视条目,而是在仍监视以前的条目时阻塞。
你知道我该怎么做吗?

ryhaxcpt

ryhaxcpt1#

首先让我解释一下为什么你的程序不能按照你想要的方式工作:这是因为我们使用ThreadPoolExecutor作为上下文管理器,它在调用map所启动的所有线程都完成之前不会关闭,所以main()会在那里等待,直到所有工作都完成,循环的下一次迭代才会发生。
有很多方法可以解决这个问题。因为你已经在使用asyncio,一种方法是将Executor的创建转移到一个单独的任务中。主循环的每次迭代都会启动这个任务的一个副本,这个副本会一直运行到任务完成。这是一个async def函数,所以这个任务的许多副本可以并发运行。
我在代码中做了一些改动,只是使用了一些简单的print语句,而不是Client语句,我将一个随机长度的整数列表传递给handle_event,每次while_True语句结束时,计数器都会递增:循环,并将计数器的10倍加到列表中的每个整数上。这使得很容易看到旧调用是如何继续一段时间,与新调用混合在一起的。我还缩短了您的时间延迟。所有这些更改都是为了方便,并不重要。
重要的变化是将ThreadPoolExecutor创建移到一个任务中,为了使它与其他任务协作,它必须包含一个等待表达式,因此我使用executor.submit而不是executor.mapsubmit返回concurrent.futures. Future,这提供了一种方便的方式来等待所有调用的完成。另一方面,executor.map返回一个迭代器;我想不出什么好办法把它变成一个可等待的对象。
要将concurrent.futures.Future转换为asyncio.Future(一个可等待的),有一个函数asyncio.wrap_future。当所有的futures都完成时,我退出ThreadPoolExecutor上下文管理器。这将非常快,因为Executor的所有工作都完成了,所以它不会阻塞其他任务。

import random
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import asyncio

def handle_event(event):
    for i in range(10):
        print("Still here", event)
        sleep(2)
        
async def process_entires(counter, entires):
    print("Counter", counter, "Entires", entires)
    x = [counter * 10 + a for a in entires]
    with ThreadPoolExecutor(max_workers=len(entires)) as executor:
        futs = []
        for z in x:
            futs.append(executor.submit(handle_event, z))
        await asyncio.gather(*(asyncio.wrap_future(f) for f in futs))

async def main():
    counter = 0
    while True:
        entires = [0, 1, 2, 3, 4][:random.randrange(5)]

        if len(entires) > 0:
            counter += 1
            asyncio.create_task(process_entires(counter, entires))

        await asyncio.sleep(3)

if __name__ == "__main__":
    asyncio.run(main())

相关问题