我用的是气流版本 1.10.11
,并有一个简单的测试dag,如下所示:
from datetime import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from backend.logic.games import refresh_game_data
dag = DAG(
dag_id='update_game_data',
schedule_interval='@once',
start_date=dt.utcnow()
)
def refresh_game_with_context(**kwargs):
game_id = kwargs['dag_run'].conf['game_id']
refresh_game_data(game_id)
refresh_game_data_task = PythonOperator(
task_id="refresh_game_data",
python_callable=refresh_game_with_context,
dag=dag,
provide_context=True
)
refresh_game_data_task
当我通过ui手动调用任务时,它会成功,不会出现任何问题,并生成预期的输出:
调用cli时,我得到以下输出:
root@b324f7099e97:/home/backend# airflow trigger_dag 'update_game_data' --conf '{"game_id":3}'
[2020-07-30 02:46:06,264] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:06,267] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:06,267] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
result = self._query(query)
Created <DagRun update_game_data @ 2020-07-30 02:46:07+00:00: manual__2020-07-30T02:46:07+00:00, externally triggered: True>
与python中的本地客户端类似的结果:
In [1]: from airflow.api.client.local_client import Client
...:
...: afc = Client(None, None)
...: res = afc.trigger_dag(dag_id='update_game_data', conf={"game_id": 3})
[2020-07-30 02:46:43,612] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-07-30 02:46:43,618] {__init__.py:50} INFO - Using executor CeleryExecutor
[2020-07-30 02:46:43,619] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags/update_game_data.py
/usr/local/lib/python3.7/dist-packages/pymysql/cursors.py:170: Warning: (1300, "Invalid utf8mb4 character string: '800495'")
result = self._query(query)
但是,在这两种情况下,任务实际上都不会发送到airflow celery worker,就像通过ui触发时一样。奇怪的是,在这两种情况下,ui都显示dag运行成功,但没有日志数据,也没有将dag中的任务标记为成功:
我确信这就是问题所在,但是可以用一个指向正确方向的指针。我使用redis作为我的任务后端,rmq作为我的代理,mysql作为我的metadb,如下所述。
1条答案
按热度按时间7ajki6be1#
我从外部触发的dag样本开始,没有任何问题。然而,当我开始交换自己的逻辑时,它开始崩溃。然后我发现这篇文章警告不要使用动态开始日期。有趣的是,虽然动态开始日期似乎可以摆脱气流调度器,但外部触发的示例使用了一个,而且它是有效的。
我两个都试过了
datetime.now()
以及datetime.utcnow()
在这两种情况下,都会抛出一个错误,即任务执行日期必须大于开始日期。我试着把时间戳转换成日期,但是我不喜欢时间戳中没有tz信息。最终解决的问题是放弃了对utcnow()/now()
并将我的dag设置为固定的开始日期,如下所示:通过固定的开始日期和遗漏的tz信息,我能够得到这个工作。为什么,确切地说,这是工作的,为什么上面的dag工作通过用户界面,而不是通过外部触发是一个问题,我将留给我们中间的气流忍者。这对于一次性、外部触发的DAG很好。如果/当我们的应用程序开始使用airflow作为beat调度程序时,我们将看到是否遇到同样的问题。目前,我们将celery 与redis结合使用来管理作业计划。