我正在处理一个关于airflow的任务,其中一部分是试图知道DAG的任务上次成功执行是什么时候。我发现这个问题here非常有用。它基本上是查询airflow postgres数据库。我在容器部署中使用airflow。版本2.4.2
这是一个函数:
from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance
def last_execution_date(
dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[str]:
"""
This function is to queries against airflow table and
return the most recent execution date
Args:
dag_id: dag name
task_id : task name
n : number of runs
session: Session to connect airflow postgres db
Returns:
list of execution date of most of recent n runs
"""
session = Session()
query_val = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == dag_id,
TaskInstance.task_id == task_id,
TaskInstance.state == State.SUCCESS
)
.order_by(TaskInstance.execution_date.desc())
.limit(n)
)
execution_dates = list(map(lambda ti: ti.execution_date, query_val))
return execution_dates
它会产生以下错误:
ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
cursor, statement, parameters, context
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "DESC"
LINE 5: ..._id = task_instance.run_id AND dag_run.execution_date DESC)
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/***/.local/lib/python3.7/site-packages/***/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/***/.local/lib/python3.7/site-packages/***/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/***/.local/lib/python3.7/site-packages/***/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/***/dags/process_minio_images.py", line 33, in last_exec_date_task
last_exec_date_val = utils.last_execution_date(dag_id=dag_id, task_id=task_id, n=n)
File "/opt/***/dags/utils.py", line 55, in last_execution_date
execution_dates = list(map(lambda ti: ti.execution_date, query_val))
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2887, in __iter__
return self._iter().__iter__()
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2897, in _iter
execution_options={"_sa_orm_load_options": self.load_options},
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1689, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 326, in _execute_on_connection
self, multiparams, params, execution_options
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1491, in _execute_clauseelement
cache_hit=cache_hit,
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
e, statement, parameters, cursor, context
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2027, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
raise exception
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
cursor, statement, parameters, context
File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.SyntaxError) syntax error at or near "DESC"
LINE 5: ..._id = task_instance.run_id AND dag_run.execution_date DESC)
^
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash, dag_run_1.log_template_id AS dag_run_1_log_template_id
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.state = %(state_1)s ORDER BY EXISTS (SELECT 1
FROM dag_run
WHERE dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id AND dag_run.execution_date DESC)
LIMIT %(param_1)s]
[parameters: {'dag_id_1': 'image_processing', 'task_id_1': 'load_image', 'state_1': <TaskInstanceState.SUCCESS: 'success'>, 'param_1': 2}]
(Background on this error at: https://sqlalche.me/e/14/f405)
在格式化sqlalchemy生成的SQL查询后,在错误的结尾:
SELECT
task_instance.try_number AS task_instance_try_number ,
task_instance.task_id AS task_instance_task_id ,
task_instance.dag_id AS task_instance_dag_id ,
task_instance.run_id AS task_instance_run_id ,
task_instance.map_index AS task_instance_map_index ,
task_instance.start_date AS task_instance_start_date ,
task_instance.end_date AS task_instance_end_date ,
task_instance.duration AS task_instance_duration ,
task_instance.state AS task_instance_state ,
task_instance.max_tries AS task_instance_max_tries ,
task_instance.hostname AS task_instance_hostname ,
task_instance.unixname AS task_instance_unixname ,
task_instance.job_id AS task_instance_job_id ,
task_instance.pool AS task_instance_pool ,
task_instance.pool_slots AS task_instance_pool_slots ,
task_instance.queue AS task_instance_queue ,
task_instance.priority_weight AS task_instance_priority_weight ,
task_instance.operator AS task_instance_operator ,
task_instance.queued_dttm AS task_instance_queued_dttm ,
task_instance.queued_by_job_id AS task_instance_queued_by_job_id ,
task_instance.pid AS task_instance_pid ,
task_instance.executor_config AS task_instance_executor_config ,
task_instance.external_executor_id AS task_instance_external_executor_id,
task_instance.trigger_id AS task_instance_trigger_id ,
task_instance.trigger_timeout AS task_instance_trigger_timeout ,
task_instance.next_method AS task_instance_next_method ,
task_instance.next_kwargs AS task_instance_next_kwargs ,
dag_run_1.state AS dag_run_1_state ,
dag_run_1.id AS dag_run_1_id ,
dag_run_1.dag_id AS dag_run_1_dag_id ,
dag_run_1.queued_at AS dag_run_1_queued_at ,
dag_run_1.execution_date AS dag_run_1_execution_date ,
dag_run_1.start_date AS dag_run_1_start_date ,
dag_run_1.end_date AS dag_run_1_end_date ,
dag_run_1.run_id AS dag_run_1_run_id ,
dag_run_1.creating_job_id AS dag_run_1_creating_job_id ,
dag_run_1.external_trigger AS dag_run_1_external_trigger ,
dag_run_1.run_type AS dag_run_1_run_type ,
dag_run_1.conf AS dag_run_1_conf ,
dag_run_1.data_interval_start AS dag_run_1_data_interval_start ,
dag_run_1.data_interval_end AS dag_run_1_data_interval_end ,
dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision,
dag_run_1.dag_hash AS dag_run_1_dag_hash ,
dag_run_1.log_template_id AS dag_run_1_log_template_id
FROM
task_instance
JOIN
dag_run AS dag_run_1
ON
dag_run_1.dag_id = task_instance.dag_id
AND dag_run_1.run_id = task_instance.run_id
WHERE
task_instance.dag_id = %(dag_id_1)s
AND task_instance.task_id = %(task_id_1)s
AND task_instance.state = %(state_1)s
ORDER BY
EXISTS
(
SELECT
1
FROM
dag_run
WHERE
dag_run.dag_id = task_instance.dag_id
AND dag_run.run_id = task_instance.run_id
AND dag_run.execution_date
DESC
)
LIMIT
%(param_1)s
似乎在SQL查询的order by语句中确实存在语法错误。问题是什么?是来自我使用的SQLAlchemy还是气流错误?我们应该如何解决这个问题?
1条答案
按热度按时间icnyk63a1#
order_by
子句使用了EXISTS
语句,但它会出现暂停问题,只需使用连接表dag_run_1.execution_date
中的相应列即可