python-3.x 在Airflow中使用“BaseBranchOperator”的多重继承

hc2pp10m  于 2023-01-06  发布在  Python
关注(0)|答案(1)|浏览(155)

可以在Airflow中使用BaseBranchOperator来使用多重继承吗?
我想定义一个运算符,如下所示:

from airflow.models import BaseOperator
from airflow.operators.branch import BaseBranchOperator

class MyOperator(BaseOperator, BaseBranchOperator):

    def execute(self, context):
        print('hi')

    def choose_branch(self, context):
        if True:
            return 'task_A'
        else:
            return 'task_B'

在这种情况下,认为execute方法将在choose_branch方法之前运行是否准确?

fkaflof6

fkaflof61#

这样不行的
在Airflow中,每个操作符都有设置操作符逻辑的执行函数。在BaseBranchOperator的情况下,execute函数利用choose_branch并处理如何跳过任务的逻辑,因此所有用户要做的只是说出要跳过的任务,这在choose_branch中完成:

def choose_branch(self, context: Context) -> str | Iterable[str]:
    """
    Abstract method to choose which branch to run.

    Subclasses should implement this, running whatever logic is
    necessary to choose a branch and returning a task_id or list of
    task_ids.

    :param context: Context dictionary as passed to execute()
    """
    raise NotImplementedError

因此,当您想要实现自己的分支运算符时,您所需要做的就是从BaseBranchOperator继承并覆盖choose_branch函数。
你可以决定你不想要这个机制,你想建立自己的分支逻辑,这意味着你需要实现如何跳过任务的逻辑。在这种情况下,你将实现MyBaseBranchOperator,然后你的实际分支操作符(在你的例子MyOperator)将是:

class MyOperator(MyBaseBranchOperator):
    ...

我认为您真正需要的是pre_execute(),它正好在调用execute()之前触发
所以你可能想要的是:

class MyOperator(BaseBranchOperator):

    def pre_execute(self, context):
        print('hi')

    def choose_branch(self, context):
        if True:
            return 'task_A'
        else:
            return 'task_B'

相关问题