docker 如何使用Python在Airflow中通过另一个任务触发DAG任务,而不管前一个任务是否成功?

g6baxovj  于 2022-11-02  发布在  Docker
关注(0)|答案(1)|浏览(224)

描述

  • 如何连续运行多个ExternalPythonOperator(我需要不同的包/版本用于不同的DAG任务),而不依赖于前一个任务的成功“upstream_fail”。
  • 因此,它应该一个接一个地执行任务,而不关心它们中的任何一个是成功还是失败。
  • 您可能会问,为什么不创建单独的DAG文件呢?关键是,我希望在一个非常独立的时间段内运行两个资源密集型任务,以确保它们不会造成任何中断。它们还必须彼此分离,因为每个服务器都可能因为服务器和其他服务器上的资源限制而彼此中断也有外部原因。

我的代码

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()

my_default_args = {
    'owner': 'me',
    #'email': ['myemail@myemail.com'],
    'email_on_failure': True,
    #'email_on_retry': True,
    #'retries': 1,

# 'retry_delay': timedelta(minutes=1)

}

with DAG(
    dag_id='some_dag_id_comes_here',
    schedule='1 * * * *', 
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
    catchup=False,
    default_args=my_default_args,
    tags=['xyz1'],
    ) as dag:
    @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3')
    def func1(): 
        print('elements of task 1')
        time.sleep(10)

    @task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3')
    def func2(): 
        print('elements of task 2')
        time.sleep(10)

    task1 >> task2

我尝试过的事情#

代码1

@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.ALL_DONE)

错误1

Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/name.py", line 224, in <module>
task1 >> task2 >> task3 >> task4 >> task5 TypeError: unsupported operand type(s) for >>: '_TaskDecorator' and '_TaskDecorator'

代码2

@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.none_skipped)

错误2

Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
  File "/opt/airflow/dags/test_file.py", line 51, in <module>
    ,trigger_rule=TriggerRule.none_skipped
  File "/usr/local/lib/python3.8/enum.py", line 384, in __getattr__
    raise AttributeError(name) from None
AttributeError: none_skipped
h6my8fg2

h6my8fg21#

DAG,我的解决方案

import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()

my_default_args = {
    'owner': 'me',
    #'email': ['myemail@myemail.com'],
    'email_on_failure': True,
    #'email_on_retry': True,
    #'retries': 1,

# 'retry_delay': timedelta(minutes=1)

}

with DAG(
    dag_id='some_dag_id_comes_here',
    schedule='1 * * * *', 
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
    catchup=False,
    default_args=my_default_args,
    tags=['xyz1'],
    ) as dag:
    @task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',trigger_rule='all_done')
    def func1(): 
        import time
        print('elements of task 1')
        time.sleep(10)

    @task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3',trigger_rule='all_done')
    def func2(): 
        import time
        print('elements of task 2')
        time.sleep(10)

    [func1() >> func2()]

相关问题