Web Services 如何在Python中使用Flask执行周期性任务

polhcujo  于 2022-11-15  发布在  Python
关注(0)|答案(4)|浏览(222)

我一直在使用Flask为我的k8055 USB接口板提供一个简单的web API;相当标准的吸气剂和推杆, flask 真的让我的生活轻松了很多。
但是我希望能够在乳清发生时/附近记录状态的变化。
例如,如果我有一个按钮连接到板上,我可以为那个特定的端口轮询api。但是如果我想让输出直接反映输出,不管有没有人在和api说话,我都会有这样的东西。

while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)

每一秒钟,输出都会更新,以匹配输入。
有没有办法在Flask下做这种事情?我以前在Twisted中做过类似的事情,但是Flask对于这个特定的应用程序来说太方便了,现在还不能给予它...

  • 谢谢-谢谢
bq9c1y66

bq9c1y661#

对于我的Flask应用程序,我考虑使用Pashka在他的answerschedule库和APScheduler中描述的cron方法。
我发现APScheduler很简单,并且服务于周期性任务运行的目的,所以继续使用APScheduler。
示例代码:

from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler

app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()
wvmv3b1j

wvmv3b1j2#

您可以使用cron执行简单的任务。
为任务创建 flask 视图。

# a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs

然后使用cron定期从该url下载

# cron task to run each minute
0-59 * * * * run_task.sh

其中run_task. sh的内容为

wget http://localhost/task

如果您需要更高的频率(比如,每5秒=每分钟12次),则必须在tun_task. sh中按以下方式执行

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done
tpgth1q7

tpgth1q73#

由于某种原因,Antony's code对我不起作用。我没有收到任何错误消息或任何东西,但test_job函数不会运行。
我安装了Flask-APScheduler,然后使用下面的代码使其工作,这是安东尼的代码和this Techcoil article示例的混合。

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
scheduler.add_job(id='test-job', func=test_job, trigger='interval', seconds=1)
q3qa4bjr

q3qa4bjr4#

不,Flask中不支持任务,但您可以使用flask-celery或在单独的线程(greenlet)中运行函数。

相关问题