airflow:如何在一行中设置大量的外部依赖项?

zsbz8rwp  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(377)

我读这个问题是为了实现对其他任务的依赖性 DAG s。在本例中,依赖关系编写为:

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \

在我的数据仓库中,一个表可能依赖于数百个任务。使用此格式,它将生成 2*number of dependencies 代码行。这实在让人无法接受,还有更好的选择吗?
例如,在 Azkaban ,我可以这样编写多个依赖项:

dependencies = dag1.task1, dag2.task4, dag2.task5, DAG3.task2, etc...

感谢您的帮助。

omjgkv6w

omjgkv6w1#

您可以在循环中创建传感器并在其中设置依赖项。我认为它更干净,但我不确定这是否符合您的要求,即随着依赖项数量的增加,代码的数量会增加。
例子:

dependencies = [('dag1', 'task1'), ('dag2', 'task4'), ('dag2', 'task5'), ('dag3', 'task2')]

other_task = PythonOperator(...)

for dag_id, task_id in dependencies:
    sensor = ExternalTaskSensor(
        task_id='wait_for_{0}.{1}'.format(dag_id, task_id),
        external_dag_id=dag_id,
        external_task_id=task_id,
        dag=dag)
    sensor >> other_task

相关问题