如何在python中对nats连接进行单元/功能测试

efzxgjgh  于 2021-09-29  发布在  Java
关注(0)|答案(0)|浏览(482)

python中有一个异步函数,用于建立nats连接和订阅主题。为这段代码编写单元(和/或)功能测试的最佳方法是什么?

from nats.aio.client import Client as NATS

async def run(loop):

        nc = NATS()
        logger.info("Connecting to nats...")
        await nc.connect(NATS_URL, loop=loop)
        logger.info("NATS connected!")

        # Subscribe with queue
        ssid_disconnect = await nc.subscribe(topic_ep_disconnected, cb=message_handler_ep_disconnected, queue=subscription_queue_disconnect)
        ssid_tse = await nc.subscribe(topic_ep_time_series_event, cb=message_handler_ep_tsd, queue=subscription_queue_tsd)

        return ssid_disconnect, ssid_tse

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题