如何为python asyncio创建自定义事件

agxfikkp  于 2021-06-26  发布在  Mesos
关注(0)|答案(1)|浏览(383)

mesos调度器有一个异步httpapi。e、 当一个人通过发布一个json来部署一个应用程序时 /v2/apps 将返回部署id。然后,可以使用该id在中轮询部署状态 /v2/deployments 或者通过订阅 /v2/events 去找找 deployment_success 事件。
我想创建一个带有协程的异步python客户机。例如。 client.deploy_app(...) 应该在 deployment_success 事件到达但未阻塞。
如何用asyncio实现这些方法?如何创建事件侦听器?它感觉一个事件循环是为这个,但我不知道我如何注册事件。

sf6xfgos

sf6xfgos1#

创建所需的异步post http请求 /v2/apps 可以用 aiohttp 模块:

import asyncio
import aiohttp

async def post(url, json):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=json) as resp:
            return await resp.json()

async def main():
    res = await post('http://httpbin.org/post', {'test': 'object'})
    print(res['json'])  # {'test': 'object'}

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果你想用 /v2/events 要跟踪部署成功,您应该请求流(参见api文档)。它可以在 aiohttp 使用异步迭代:您只需异步地逐行读取内容,等待所需的事件,例如:

import asyncio
import aiohttp

async def stream(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            async for line in resp.content:
                yield line

async def main():
    async for line in stream('http://httpbin.org/stream/10'):
        print(line)  # check if line contains event you need

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果你想用 /v2/deployments 您应该定期请求它,并使用 asyncio.sleep . 在这种情况下,您的函数不会被阻塞:

import asyncio
import aiohttp

async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

async def main():
    while True:       
        # Make request to see if deplayment finished:
        res = await get('http://httpbin.org/get')
        print(res['origin'])  # break if ok here
        # Async wait before next try:
        await asyncio.sleep(3)

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

相关问题