python-3.x 如何在同一个事件循环中运行另一个应用程序?

zlwx9yxi  于 2023-05-02  发布在  Python
关注(0)|答案(1)|浏览(219)

我希望我的FastAPI应用程序能够始终访问python-telegram-bot的实际bot_data。我需要它,这样当我调用FastAPI中的某个端点时,例如,可以向所有聊天发送消息,存储在bot_data中。
正如我所理解的问题:bot.run_polling()uvicorn.run(...)启动两个独立的异步循环。我需要在一个运行。

UPD-1:

感谢@MatsLindh,我创建了下一个函数,我传递给main块,但它的工作不一致。有时 www.example.com _polling()(获取正确的循环,一切正常,但其他时候会出现错误,即有不同的循环):

import asyncio
from uvicorn import Config, Server
# --snip--
def run(app: FastAPI, bot:Application):
    # using get_event_loop leads to:
    # RuntimeError: Cannot close a running event loop
    # I guess it is because bot.run_polling()
    # calls loop.run_until_complete() different tasks
    # loop = asyncio.get_event_loop()
    loop = asyncio.new_event_loop()
    server = Server(Config(app=app, port=9001))
    loop.create_task(server.serve())

    t = Thread(target=loop.run_forever)
    t.start()

    bot.run_polling()

    t.join()
# --snip--
if __name__ == "__main__":
# --snip--
    run(f_app, bot_app)

我也知道我可以将bot.run_polling()分解成几个单独的调用,这些调用在内部聚集,但我确信它应该只使用shortuct函数。

姓名首字母

我的简化设置如下所示。
最初我尝试不使用线程而是使用multiprocessing.Proccess运行,但是这样我的bot_data总是空的-我认为这是因为机器人数据在进程之间不共享,所以整个事情必须在一个进程中。在这里,我失败了,在一个异步循环中运行所有这些东西。

# main.py
# python3.10
# pip install fastapi[all] python-telegram-bot
from threading import Thread

import uvicorn
from telegram.ext import Application, ApplicationBuilder, PicklePersistence
from fastapi import FastAPI, Request

BOT_TOKEN = "telegram-bot-token"
MY_CHAT = 123456

class MyApp(FastAPI):
    def add_bot(self, bot_app: Application):
        self.bot_app = bot_app

async def post_init(app: Application):
    app.bot_data["key"] = 42

f_app = MyApp()

@f_app.get("/")
async def test(request: Request):
   app: MyApp = request.app
   bot_app: Application = app.bot_app
   val = bot_app.bot_data.get('key')
   print(f"{val=}")
   await bot_app.bot.send_message(MY_CHAT, f"Should be 42: {val}")

if __name__ == "__main__":
    pers = PicklePersistence("storage")
    bot_app = ApplicationBuilder().token(BOT_TOKEN).post_init(post_init).persistence(pers).build()
    f_app.add_bot(bot_app)

    t1 = Thread(target=uvicorn.run, args=(f_app,), kwargs={"port": 9001})
    t1.start()

    # --- Launching polling in main thread causes
    # telegram.error.NetworkError: Unknown error in HTTP implementation:
    # RuntimeError('<asyncio.locks.Event object at 0x7f2764e6fd00 [unset]> is bound to a different event loop')
    # message is sent and value is correct, BUT app breaks and return 500
    # bot_app.run_polling()

    # --- Launching polling in separate thread causes
    # RuntimeError: There is no current event loop in thread 'Thread-2 (run_polling)'.
    # t2 = Thread(target=bot_app.run_polling)
    # t2.start()

    # --- Launching with asyncio causes:
    # ValueError: a coroutine was expected, got <bound method Application.run_polling ...
    # import asyncio
    # t2 = Thread(target=asyncio.run, args=(bot_app.run_polling,))
    # t2.start()

    t1.join()
xlpyo6sf

xlpyo6sf1#

调用uvicorn.run()时,会创建一个新的event loop(在内部调用asyncio.run()-请参阅链接的源代码)。当启动uvicorn服务器(以及FastAPI应用程序)后尝试启动另一个应用程序时-或者反之亦然-这也会创建一个新的事件循环,例如您的Telegram bot应用程序,启动其他应用程序的代码行将不会到达,直到退出已经运行的事件循环。这是因为运行事件循环是阻塞,意味着它将阻塞调用线程**,直到事件循环终止**。
如果您还尝试在已经使用事件循环的应用程序中运行其他应用程序(本质上是事件循环),或者尝试调用asyncio.run(),或者应用程序中有多个对loop.run_until_complete()的调用,则会遇到以下错误:

> RuntimeError: Cannot run the event loop while another loop is running
> RuntimeError: asyncio.run() cannot be called from a running event loop
> RuntimeError: This event loop is already running

有几种方法可以解决这个问题。出于演示的目的,下面给出的解决方案使用一个简单的打印应用程序作为第二个应用程序,该应用程序也创建了一个事件循环。此应用程序如下:

printing_app。py

import asyncio

async def go():
    counter = 0
    while True:
        counter += 1
        print(counter)
        await asyncio.sleep(1)

       
def run():
    asyncio.run(go())

方案一

您可以使用uvicorn.Server.serve()从已经运行的async环境中运行uvicorn。首先,使用asyncio.new_event_loop()创建新的事件循环,然后使用asyncio.set_event_loop()将其设置为当前线程的当前事件循环。接下来,通过使用loop.create_task()并向其传递协程来调度另一个异步应用程序的执行(即:即,协程对象是调用async def函数的结果),而不是执行asyncio.run()函数的方法。在上面的printing_app.py中,这就是go()函数。封装在任务中的协程可能不会立即运行。它被调度并将在事件循环找到执行任务的机会时立即运行-如this answer中所述,当当前运行的协程到达await表达式以及async forasync with块时可能会发生这种情况,因为这些操作在后台使用await
最后,使用loop.run_until_complete()运行uvicorn服务器,通过传递uvicorn.Server.serve()协程-如果传递给loop.run_until_complete()的参数是协程,则将其 Package 在Task中(参见相关实现以及上面链接的文档);因此,这次不需要在协程上调用loop.create_task()。它将执行所提供的任务并阻塞,直到任务完成。
为了清楚起见,asyncio.new_event_loop(),后面是asyncio.set_event_loop()loop.run_until_complete()是使用asyncio.run()时在幕后实际发生的事情-请参阅最新的Python的Runner类实现,以及Python 3中run()方法的实现。10(可能更清楚)。
P.S.也可以使用create_task()创建每个任务,最后调用loop.run_forever(),它将永远运行事件循环,直到通过调用其stop()方法显式停止它。另一方面,loop.run_until_complete()将一直运行,直到您传递给它的任务完成并返回其结果。根据个人的需要,以及他们必须执行的任务的性质,可以在两者之间进行选择。

示例1
from fastapi import FastAPI
import printing_app
import asyncio
import uvicorn

app = FastAPI()

@app.get('/')
def main():
    return 'Hello World!'
    

def start_uvicorn(loop):
    config = uvicorn.Config(app, loop=loop)
    server = uvicorn.Server(config)
    loop.run_until_complete(server.serve())
    

def start_printing_app(loop):
    loop.create_task(printing_app.go())  # passs go() (coroutine), not run() 

            
if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    start_printing_app(loop)
    start_uvicorn(loop)
示例二

由于这是一个FastAPI应用程序,您可以像往常一样运行服务器(使用uvicorn.run(app)),并利用FastAPI/Starlette的Lifespan eventsstartup上执行第二个应用程序。要执行它,您可以使用asyncio.create_task(),它将把协程 Package 到一个任务中,如前所述,并调度它的执行。该任务将在asyncio.get_running_loop()返回的循环中执行,该循环返回当前线程中的事件循环。或者,您可以自己调用asyncio.get_running_loop()来获取正在运行的事件循环,然后使用create_task()函数来执行任务,如前所述。

from fastapi import FastAPI
from contextlib import asynccontextmanager
import asyncio
import printing_app
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
    asyncio.create_task(printing_app.go())
    # Alternatively:
    #loop = asyncio.get_running_loop()
    #loop.create_task(printing_app.go())
    yield

app = FastAPI(lifespan=lifespan)

@app.get('/')
def main():
    return 'Hello World!'
    

if __name__ == '__main__':
    uvicorn.run(app)

方案二

另一种解决方案是使用nest_asyncio,如here所示,它允许在嵌套环境中运行多个asyncio事件循环。但是,通常建议避免使用嵌套事件循环,因为它可能会导致意外行为。

在FastAPI应用中运行Telegram Bot应用

正如相关库的维护者在github上的评论中提到的,使用Application.run_polling()纯粹是可选的,并且会阻塞事件循环,直到用户发送停止信号;这就是run_polling()不适合与ASGI框架(如FastAPI)结合使用的原因。在这种情况下,您可以手动调用run_polling()在后台实际运行的方法。一个展示如何在Starlette应用程序上运行uvicorn服务器的示例,沿着一个电报机器人应用程序,可以看到here。基于该示例和上面提供的所有信息,提供了以下解决方案。

示例1
from fastapi import FastAPI
import asyncio
import uvicorn

app = FastAPI()

@app.get('/')
def main():
    return 'Hello World!'
    

async def main():
    config = uvicorn.Config(app, host='0.0.0.0', port=8000)
    server = uvicorn.Server(config)
    
    application = .... # initialise your telegram-bot app
    
    # Run application and webserver together
    async with application:
        await application.start()
        await server.serve()
        await application.stop()

if __name__ == '__main__':
    asyncio.run(main())
示例二
from fastapi import FastAPI
from contextlib import asynccontextmanager
import uvicorn

@asynccontextmanager
async def lifespan(app: FastAPI):
    application = .... # initialise your telegram-bot app
    await application.start()
    yield
    await application.stop()

  
app = FastAPI(lifespan=lifespan)

@app.get('/')
def main():
    return 'Hello World!'

    
if __name__ == '__main__':
    uvicorn.run(app)

相关问题