python—如何从init运行异步协同路由,请等待它完成

5ktev3wc  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(324)

我正在连接来自 __init__ (我不想把它搬走,因为这意味着我要做一些额外的重大改变)。我怎么能等aioredis连接 task 在下面 __init__ 示例代码并打印出来 self.sub 以及 self.pub 反对?现在它给出了一个错误
.py:42>exception=attributeerror(“'s'对象没有属性'pub'”)
我确实看到redis连接被创建和coro create_connetion 完成。
有没有一种方法可以阻止来自 __init__ . 如果我替换 asyncio.waitasyncio.run_until_complete 我得到一个错误粗略地说
循环已在运行。 asyncio.gather

import sys, json
from addict import Dict
import asyncio
import aioredis

class S():
    def __init__(self, opts):
        print(asyncio.Task.all_tasks())
        task = asyncio.wait(asyncio.create_task(self.create_connection()), return_when="ALL_COMPLETED")
        print(asyncio.Task.all_tasks())
        print(task)
        print(self.pub, self.sub)

    async def receive_message(self, channel):
        while await channel.wait_message():
            message = await channel.get_json()
            await asyncio.create_task(self.callback_loop(Dict(json.loads(message))))

    async def run_s(self):
        asyncio.create_task(self.listen())
        async def callback_loop(msg):
            print(msg)

        self.callback_loop = callback_loop

    async def create_connection(self):
        self.pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
        self.listener = await self.sub.subscribe(f"abc")

    async def listen(self):
        self.tsk = asyncio.ensure_future(self.receive_message(self.listener[0]))
        await self.tsk

async def periodic(): #test function to show current tasks
    number = 5
    while True:
        await asyncio.sleep(number)
        print(asyncio.Task.all_tasks())

async def main(opts):
    loop.create_task(periodic())
    s = S(opts)
    print(s.pub, s.sub)
    loop.create_task(s.run_s())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    main_task = loop.create_task(main(sys.argv[1:]))
    loop.run_forever() #I DONT WANT TO MOVE THIS UNLESS IT IS NECESSARY
iyfjxgzm

iyfjxgzm1#

我认为您要做的是确保函数create\u connections在s构造函数之前运行到完成。一种方法是重新排列代码。将create\u connections函数移到类外:

async def create_connection():
    pub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    sub = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    db = await aioredis.create_redis("redis://c8:7070/0", password="abc")
    listener = await self.sub.subscribe(f"abc")
    return pub, sub, db, listener

现在在构造s之前等待该函数。所以你的主要功能是:

async def main(opts):
    loop.create_task(periodic())
    x = await create_connections()
    s = S(opts, x)  # pass the result of create_connections to S
    print(s.pub, s.sub)
    loop.create_task(s.run_s())

现在修改s构造函数以接收创建的对象:

def __init__(self, opts, x):
    self.pub, self.sub, self.db, self.listener = x

我不确定您要用return\u when参数和对asyncio.wait的调用做什么。create\u connections函数不会启动一组并行任务,而是等待每个调用,然后再转到下一个。也许您可以通过并行运行这四个调用来提高性能,但这是另一个问题。

相关问题