docker Airflow的session.query()生成一个带有语法错误的SQL查询,以获取上次任务执行的日期-时间

lnlaulya  于 2023-04-20  发布在  Docker
关注(0)|答案(1)|浏览(79)

我正在处理一个关于airflow的任务,其中一部分是试图知道DAG的任务上次成功执行是什么时候。我发现这个问题here非常有用。它基本上是查询airflow postgres数据库。我在容器部署中使用airflow。版本2.4.2
这是一个函数:

from typing import List, Dict, Any, Optional
from airflow.utils.state import State
from airflow.settings import Session
from airflow.models.taskinstance import TaskInstance

def last_execution_date(
    dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[str]:
    """
    This function is to queries against airflow table and
    return the most recent execution date
    Args:
        dag_id: dag name
        task_id : task name
        n : number of runs
        session: Session to connect airflow postgres db
    Returns:
        list of execution date of most of recent n runs
    """
    session = Session()
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS
        )
        .order_by(TaskInstance.execution_date.desc())
        .limit(n)
    )
    execution_dates = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

它会产生以下错误:

ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
    cursor, statement, parameters, context
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.SyntaxError: syntax error at or near "DESC"
LINE 5: ..._id = task_instance.run_id AND dag_run.execution_date DESC) 
                                                                 ^

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/***/.local/lib/python3.7/site-packages/***/decorators/base.py", line 188, in execute
    return_value = super().execute(context)
  File "/home/***/.local/lib/python3.7/site-packages/***/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/home/***/.local/lib/python3.7/site-packages/***/operators/python.py", line 193, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/***/dags/process_minio_images.py", line 33, in last_exec_date_task
    last_exec_date_val = utils.last_execution_date(dag_id=dag_id, task_id=task_id, n=n)
  File "/opt/***/dags/utils.py", line 55, in last_execution_date
    execution_dates = list(map(lambda ti: ti.execution_date, query_val))
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2887, in __iter__
    return self._iter().__iter__()
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2897, in _iter
    execution_options={"_sa_orm_load_options": self.load_options},
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1689, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 326, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1491, in _execute_clauseelement
    cache_hit=cache_hit,
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2027, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
    cursor, statement, parameters, context
  File "/home/***/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.SyntaxError) syntax error at or near "DESC"
LINE 5: ..._id = task_instance.run_id AND dag_run.execution_date DESC) 
                                                                 ^

[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash, dag_run_1.log_template_id AS dag_run_1_log_template_id 
FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id 
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.state = %(state_1)s ORDER BY EXISTS (SELECT 1 
FROM dag_run 
WHERE dag_run.dag_id = task_instance.dag_id AND dag_run.run_id = task_instance.run_id AND dag_run.execution_date DESC) 
 LIMIT %(param_1)s]
[parameters: {'dag_id_1': 'image_processing', 'task_id_1': 'load_image', 'state_1': <TaskInstanceState.SUCCESS: 'success'>, 'param_1': 2}]
(Background on this error at: https://sqlalche.me/e/14/f405)

在格式化sqlalchemy生成的SQL查询后,在错误的结尾:

SELECT
        task_instance.try_number           AS task_instance_try_number          ,
        task_instance.task_id              AS task_instance_task_id             ,
        task_instance.dag_id               AS task_instance_dag_id              ,
        task_instance.run_id               AS task_instance_run_id              ,
        task_instance.map_index            AS task_instance_map_index           ,
        task_instance.start_date           AS task_instance_start_date          ,
        task_instance.end_date             AS task_instance_end_date            ,
        task_instance.duration             AS task_instance_duration            ,
        task_instance.state                AS task_instance_state               ,
        task_instance.max_tries            AS task_instance_max_tries           ,
        task_instance.hostname             AS task_instance_hostname            ,
        task_instance.unixname             AS task_instance_unixname            ,
        task_instance.job_id               AS task_instance_job_id              ,
        task_instance.pool                 AS task_instance_pool                ,
        task_instance.pool_slots           AS task_instance_pool_slots          ,
        task_instance.queue                AS task_instance_queue               ,
        task_instance.priority_weight      AS task_instance_priority_weight     ,
        task_instance.operator             AS task_instance_operator            ,
        task_instance.queued_dttm          AS task_instance_queued_dttm         ,
        task_instance.queued_by_job_id     AS task_instance_queued_by_job_id    ,
        task_instance.pid                  AS task_instance_pid                 ,
        task_instance.executor_config      AS task_instance_executor_config     ,
        task_instance.external_executor_id AS task_instance_external_executor_id,
        task_instance.trigger_id           AS task_instance_trigger_id          ,
        task_instance.trigger_timeout      AS task_instance_trigger_timeout     ,
        task_instance.next_method          AS task_instance_next_method         ,
        task_instance.next_kwargs          AS task_instance_next_kwargs         ,
        dag_run_1.state                    AS dag_run_1_state                   ,
        dag_run_1.id                       AS dag_run_1_id                      ,
        dag_run_1.dag_id                   AS dag_run_1_dag_id                  ,
        dag_run_1.queued_at                AS dag_run_1_queued_at               ,
        dag_run_1.execution_date           AS dag_run_1_execution_date          ,
        dag_run_1.start_date               AS dag_run_1_start_date              ,
        dag_run_1.end_date                 AS dag_run_1_end_date                ,
        dag_run_1.run_id                   AS dag_run_1_run_id                  ,
        dag_run_1.creating_job_id          AS dag_run_1_creating_job_id         ,
        dag_run_1.external_trigger         AS dag_run_1_external_trigger        ,
        dag_run_1.run_type                 AS dag_run_1_run_type                ,
        dag_run_1.conf                     AS dag_run_1_conf                    ,
        dag_run_1.data_interval_start      AS dag_run_1_data_interval_start     ,
        dag_run_1.data_interval_end        AS dag_run_1_data_interval_end       ,
        dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision,
        dag_run_1.dag_hash                 AS dag_run_1_dag_hash                ,
        dag_run_1.log_template_id          AS dag_run_1_log_template_id
FROM
        task_instance
JOIN
        dag_run AS dag_run_1
ON
        dag_run_1.dag_id = task_instance.dag_id
AND     dag_run_1.run_id = task_instance.run_id
WHERE
        task_instance.dag_id  = %(dag_id_1)s
AND     task_instance.task_id = %(task_id_1)s
AND     task_instance.state   = %(state_1)s
ORDER BY
        EXISTS
        (
                SELECT
                        1
                FROM
                        dag_run
                WHERE
                        dag_run.dag_id = task_instance.dag_id
                AND     dag_run.run_id = task_instance.run_id
                AND     dag_run.execution_date 
                DESC
        ) 
LIMIT 
        %(param_1)s

似乎在SQL查询的order by语句中确实存在语法错误。问题是什么?是来自我使用的SQLAlchemy还是气流错误?我们应该如何解决这个问题?

icnyk63a

icnyk63a1#

order_by子句使用了EXISTS语句,但它会出现暂停问题,只需使用连接表dag_run_1.execution_date中的相应列即可

def last_execution_date(
    dag_id: str, task_id: str, n: int, session: Optional[Session] = None
) -> List[str]:
    """
    This function is to queries against airflow table and
    return the most recent execution date
    Args:
        dag_id: dag name
        task_id : task name
        n : number of runs
        session: Session to connect airflow postgres db
    Returns:
        list of execution date of most of recent n runs
    """
    session = Session()
    query_val = (
        session.query(TaskInstance)
        .filter(
            TaskInstance.dag_id == dag_id,
            TaskInstance.task_id == task_id,
            TaskInstance.state == State.SUCCESS
        )
        .join(TaskInstance.dag_run)
        .order_by(TaskInstance.dag_run.execution_date.desc())
        .limit(n)
    )
    execution_dates = list(map(lambda ti: ti.execution_date, query_val))
    return execution_dates

相关问题