python-3.x Airflow -在初始化期间修补函数

yftpprvb  于 2023-05-13  发布在  Python
关注(0)|答案(1)|浏览(131)

我有一个关于跑步的气流项目。
在我的Python代码中,我有一个包含defaultdict的dataclass。当我尝试使用asdict方法时,我得到了一个异常(异常细节包含在下面的bug描述中)。
这似乎是一个开放的bug:https://bugs.python.org/issue35540
作为解决方案,我编写了一个修补函数来替换asdict函数。我试图找到一个地方,我可以钩这个变化在气流初始化(之前我的dags将运行):

import copy
from collections import defaultdict
from dataclasses import _is_dataclass_instance, fields, asdict

def my_asdict(obj, dict_factory=dict):
    if _is_dataclass_instance(obj):
        result = []
        for f in fields(obj):
            value = my_asdict(getattr(obj, f.name), dict_factory)
            result.append((f.name, value))
        return dict_factory(result)
    elif isinstance(obj, (list, tuple)):
        return type(obj)(my_asdict(v, dict_factory) for v in obj)
    elif isinstance(obj, defaultdict):
        # This is the patch          
        obj = dict(obj)
    if isinstance(obj, dict):
        return type(obj)((my_asdict(k, dict_factory), my_asdict(v, dict_factory))
                         for k, v in obj.items())
    else:
        return copy.deepcopy(obj)

asdict = my_asdict

有办法做到吗?

az31mfrm

az31mfrm1#

我想我和你在同一个场景中,我需要修补一个操作符,以便在开发中像一个虚拟操作符一样运行它,而不是实际做一些事情,我正在寻找修补它的地方,因为所有的Airflow代码都是从airflow cli开始的,我只是修补了airflow入口点文件,它就像一个魅力
因为我在docker上使用airflow,所以入口点文件在/home/airflow/.local/bin/airflow上,我只是将文件更改为

#!/usr/local/bin/python
# -*- coding: utf-8 -*-
import re
import sys
from airflow.__main__ import main

# patch
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator

def dummy(*args, **kwargs):
    pass

GlueCrawlerOperator.execute = dummy

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

相关问题