信号量Python是如何工作的

uemypmqf  于 2023-03-31  发布在  Python
关注(0)|答案(1)|浏览(161)

我想检查大约100 ips地址的可达性,并使用信号量设置并发任务的限制。但现在我不确定这到底是如何工作的,也不知道为什么它在代码示例中不起作用。正如我所观察到的,函数“task_reachable”仍然正确执行。如果没有地址可达,然后在“try_ssh_connection”中,“所有”任务都是并行执行的,这使得代码非常慢。

class test_class():
    def __init__(self):
        self.username = "username"
        self.password = "password"

        self.ips = open("open_ip_list")

    def create_async(self):

        asyncio.run(self.create_tasks())

    async def boundary_task(self,ip):
        sem = asyncio.Semaphore(2)
        async with sem:
            return await self.task_test(ip)

    async def create_tasks(self):
        timer = Timer(text=f" task time: {{:.1f}}")
        timer.start()
        tasks = [
            asyncio.ensure_future(self.boundary_task(i))
            for i
            in self.ips
        ]
        await asyncio.gather(*tasks)
        timer.stop()

    async def task_test(self):
        pass
    
if __name__ == "__main__":
    app = test_class()
    app.create_async()
kcrjzv8t

kcrjzv8t1#

您的问题是每个运行的boundary_task示例都有自己的信号量。

async def boundary_task(self, ip):
    sem = asyncio.Semaphore(2)

如果希望它们都使用相同的信号量,则boundary_task的所有示例都需要共享它。

async def boundary_task(self, ip, semaphore):
    async with sem:
        return await self.task_reachable(ip)

async def create_tasks(self):
    sem = asyncio.Semaphore(2)
    tasks = [
        self.boundary_task(i, sem)
        for i
        in self.ips
    ]
    await asyncio.gather(*tasks)

由于使用的是类,因此也可以在__init__内部创建信号量。

def __init__(self):
    ...

    self.sem = asyncio.Semaphore(2)

async def boundary_task(self, ip):
    async with self.sem:
        return await self.task_reachable(ip)

相关问题