我目前正在将DAG从airflow版本1.10.10迁移到2.0.0。
这个DAG使用一个自定义的python操作符,根据任务的复杂性,动态分配资源。问题是v1.10.10(airflow.contrib.kubernetes.pod import Resources)中使用的导入不再起作用。我读到,对于v2.0.0,我应该使用kubernetes.client.models.V1ResourceRequirements,但我需要动态构建这个资源对象。这可能听起来很愚蠢,但我还没有找到构建这个对象的正确方法。
例如,我试过
self.resources = k8s.V1ResourceRequirements(
request_memory=get_k8s_resources_mapping(resource_request)['memory'],
limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
)
字符串
或者是
self.resources = k8s.V1ResourceRequirements(
requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
'memory': get_k8s_resources_mapping(resource_request)['memory']},
limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
)
型
(get_k8s_resources_mapping(resource_request)['xxxx']只是根据resource_request返回一个值,比如'2Gi'表示内存,'2'表示cpu)
但它们似乎不起作用。任务失败。
所以,我的问题是,你如何在Python中正确构建V1ResourceRequirements?它在任务示例的executor_config属性中应该是什么样子?像这样的,也许?
'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
型
1条答案
按热度按时间hmtdttj41#
正确的语法是:
对于apache-airflow-providers-cncf-kubernetes>=5.3.0:
字符串
如果您想动态生成它,只需将requests/limits中的值替换为返回expected string的函数。
下面是代码在早期版本上工作所需的更改。
对于apache-airflow-providers-cncf-kubernetes <5.3.0 and >=4.2.0:
将导入路径更改为:
型
对于apache-airflow-providers-cncf-kubernetes<4.2.0:
将
container_resources
更改为resources