postgresql asyncpg将监听程序添加到池

4sup72z8  于 2022-11-04  发布在  PostgreSQL
关注(0)|答案(1)|浏览(139)
  • 下午好-好
    我正在尝试使用PostgreSQL中的NOTIFY/LISTEN功能,如asyncpg文档中所示,我们可以向Connection对象添加侦听器,但不能向Pool对象添加侦听器,我已经尝试了this问题中所示的解决方案,代码如下:
def listener(*args):
    print("ANYTHING")

async def main():
    creds = {}

    async def add_listeners(conn) -> None:
        await conn.add_listener("listener_channel", listener)

    pool = await asyncpg.create_pool(**creds, setup=add_listeners)
    await asyncio.sleep(10000)

asyncio.run(main())

然后跑步

NOTIFY listener_channel

在PgAdmin 4中。
然而-什么也没发生。我怎么能让它工作呢?

chy5wohz

chy5wohz1#

好吧,看起来它不工作,因为所有的连接都是空闲的,我想出了这个解决方案

import asyncio
import asyncpg

class ListenerConnection(asyncpg.Connection):
    def __init__(self, *args,**kwargs):
        self._listeners_futures: dict[str: list[asyncio.Future] = {}

    def _dummy_callback(self, channel):
        def wrapper(*args):
            if channel not in self._listeners_futures:
                return
            for fut in self._listeners_futures[channel]:
                fut.set_result(None)
            self._listeners_futures[channel].remove(fut)
        return wrapper

    async def add_one_time_listener(self, channel):
        callback = self._dummy_callback(channel)
        await self.add_listener(channel, callback)

    async def listen(self, channel):
        await self.add_one_time_listener(channel)
        future = self._loop.create_future()

        if channel not in self._listeners_futures:
            self._listeners_futures[channel] = []
        self._listeners_futures[channel].append(future)

        return await future

async def main():
    pool = await asyncpg.create_pool(**creds, connection_class=ListenerConnection)
    async with pool.acquire() as conn:
        await conn.listen("some_channel")
        print("ANYTHING")

asyncio.run(main())

相关问题