我有一个关于跑步的气流项目。
在我的Python代码中,我有一个包含defaultdict的dataclass。当我尝试使用asdict方法时,我得到了一个异常(异常细节包含在下面的bug描述中)。
这似乎是一个开放的bug:https://bugs.python.org/issue35540。
作为解决方案,我编写了一个修补函数来替换asdict函数。我试图找到一个地方,我可以钩这个变化在气流初始化(之前我的dags将运行):
import copy
from collections import defaultdict
from dataclasses import _is_dataclass_instance, fields, asdict
def my_asdict(obj, dict_factory=dict):
if _is_dataclass_instance(obj):
result = []
for f in fields(obj):
value = my_asdict(getattr(obj, f.name), dict_factory)
result.append((f.name, value))
return dict_factory(result)
elif isinstance(obj, (list, tuple)):
return type(obj)(my_asdict(v, dict_factory) for v in obj)
elif isinstance(obj, defaultdict):
# This is the patch
obj = dict(obj)
if isinstance(obj, dict):
return type(obj)((my_asdict(k, dict_factory), my_asdict(v, dict_factory))
for k, v in obj.items())
else:
return copy.deepcopy(obj)
asdict = my_asdict
有办法做到吗?
1条答案
按热度按时间az31mfrm1#
我想我和你在同一个场景中,我需要修补一个操作符,以便在开发中像一个虚拟操作符一样运行它,而不是实际做一些事情,我正在寻找修补它的地方,因为所有的Airflow代码都是从airflow cli开始的,我只是修补了airflow入口点文件,它就像一个魅力
因为我在docker上使用airflow,所以入口点文件在
/home/airflow/.local/bin/airflow
上,我只是将文件更改为