描述
- 如何连续运行多个ExternalPythonOperator(我需要不同的包/版本用于不同的DAG任务),而不依赖于前一个任务的成功“upstream_fail”。
- 因此,它应该一个接一个地执行任务,而不关心它们中的任何一个是成功还是失败。
- 您可能会问,为什么不创建单独的DAG文件呢?关键是,我希望在一个非常独立的时间段内运行两个资源密集型任务,以确保它们不会造成任何中断。它们还必须彼此分离,因为每个服务器都可能因为服务器和其他服务器上的资源限制而彼此中断也有外部原因。
我的代码
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
my_default_args = {
'owner': 'me',
#'email': ['myemail@myemail.com'],
'email_on_failure': True,
#'email_on_retry': True,
#'retries': 1,
# 'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='some_dag_id_comes_here',
schedule='1 * * * *',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), # this is from whre it starts counting time to run taks, NOT like cron
catchup=False,
default_args=my_default_args,
tags=['xyz1'],
) as dag:
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3')
def func1():
print('elements of task 1')
time.sleep(10)
@task.external_python(task_id="task2", python='/opt/airflow/my_env/bin/python3')
def func2():
print('elements of task 2')
time.sleep(10)
task1 >> task2
我尝试过的事情#
- How to Trigger a DAG on the success of a another DAG in Airflow using Python?-我的问题是极性
- Triggering the external dag using another dag in Airflow-不知道
- How to schedule the airflow DAG to run just after the end of the previous running DAG?
- 我试过第二个-https://airflow.apache.org/docs/apache-airflow/stable/concepts/dags.html#trigger-rules
代码1
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.ALL_DONE)
错误1
Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/name.py", line 224, in <module>
task1 >> task2 >> task3 >> task4 >> task5 TypeError: unsupported operand type(s) for >>: '_TaskDecorator' and '_TaskDecorator'
代码2
@task.external_python(task_id="task1", python='/opt/airflow/my_env/bin/python3',
trigger_rule=TriggerRule.none_skipped)
错误2
Broken DAG: [/opt/airflow/dags/test_file.py] Traceback (most recent call last):
File "/opt/airflow/dags/test_file.py", line 51, in <module>
,trigger_rule=TriggerRule.none_skipped
File "/usr/local/lib/python3.8/enum.py", line 384, in __getattr__
raise AttributeError(name) from None
AttributeError: none_skipped
1条答案
按热度按时间h6my8fg21#
DAG,我的解决方案