我们有一个数据工程项目,其中我们基本上使用Kubernetes集群,SparkKubernetesOperator和SparkKubernetesSensor以及Airflow编排任务和工作流。
问题是,有时任务竞争会使我们的基础设施无法以世界上最好的同步触发所有任务和传感器。因此,有时我们的SparkKubernetesOperator被触发,其传感器排队,需要很长时间才能被触发,当它发生时,SparkKubernetesOperator已经死了,传感器失败导致404错误,因为它没有找到SparkKubernetesOperator的pod日志。
显然,我们有一个基础设施瓶颈,导致这个问题的发生。然而,修修补补不是我们可以马上做的事情。因此,我们正在寻找一些解决方案,可以帮助在中期/短期。
第一种选择是增加pod中的“deliveseconds”参数,以增加传感器及时发现日志的可能性。但是,我们不确定Pod是否长时间处于活动状态,它不会释放节点的资源,从而造成另一个基础设施瓶颈。
第二种选择是试图迫使传感器与操作员一起运行,创造一种方式,使它们只能同时被触发,而不是一个接一个。这样一来,只会发生与代码本身相关的故障,而不会发生操作员和传感器之间缺乏同步的问题。最大的问题是,我们不知道是否有办法做到这一点,如果有,我们还没有找到它。
有什么建议吗?
404:无法访问
[2023-05-05, 13:24:15 UTC] {taskinstance.py:1300} INFO - Executing <Task(SparkKubernetesSensor): sensor_perfil_verificado_promo> on 2023-05-04 06:00:00+00:00
[2023-05-05, 13:24:15 UTC] {standard_task_runner.py:55} INFO - Started process 20 to run task
[2023-05-05, 13:24:15 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'pipeline_best_choices', 'sensor_perfil_verificado_promo', 'scheduled__2023-05-04T06:00:00+00:00', '--job-id', '1280973', '--raw', '--subdir', 'DAGS_FOLDER/best-choices/dag-best-choices.py', '--cfg-path', '/tmp/tmpqeronp7b']
[2023-05-05, 13:24:15 UTC] {standard_task_runner.py:83} INFO - Job 1280973: Subtask sensor_perfil_verificado_promo
[2023-05-05, 13:24:15 UTC] {task_command.py:388} INFO - Running <TaskInstance: pipeline_best_choices.sensor_perfil_verificado_promo scheduled__2023-05-04T06:00:00+00:00 [running]> on host pipeline-best-choices-sensor-p-3534f4895f7a478a812050aa582b6258
[2023-05-05, 13:24:16 UTC] {logging_mixin.py:137} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/kubernetes/kube_config.py:39 DeprecationWarning: The delete_worker_pods option in [kubernetes] has been moved to the delete_worker_pods option in [kubernetes_executor] - the old setting has been used, but please update your config.
[2023-05-05, 13:24:16 UTC] {logging_mixin.py:137} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/kubernetes/kube_config.py:41 DeprecationWarning: The delete_worker_pods_on_failure option in [kubernetes] has been moved to the delete_worker_pods_on_failure option in [kubernetes_executor] - the old setting has been used, but please update your config.
[2023-05-05, 13:24:16 UTC] {logging_mixin.py:137} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/kubernetes/kube_config.py:44 DeprecationWarning: The worker_pods_creation_batch_size option in [kubernetes] has been moved to the worker_pods_creation_batch_size option in [kubernetes_executor] - the old setting has been used, but please update your config.
[2023-05-05, 13:24:16 UTC] {pod_generator.py:424} WARNING - Model file does not exist
[2023-05-05, 13:24:16 UTC] {taskinstance.py:1509} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=
AIRFLOW_CTX_DAG_ID=pipeline_best_choices
AIRFLOW_CTX_TASK_ID=sensor_perfil_verificado_promo
AIRFLOW_CTX_EXECUTION_DATE=2023-05-04T06:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=2
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-05-04T06:00:00+00:00
[2023-05-05, 13:24:16 UTC] {spark_kubernetes.py:105} INFO - Poking: dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06
[2023-05-05, 13:24:16 UTC] {base.py:73} INFO - Using connection ID 'kubernetes_default' for task execution.
[2023-05-05, 13:24:16 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 316, in get_custom_object
group=group, version=version, namespace=namespace, plural=plural, name=name
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/custom_objects_api.py", line 1484, in get_namespaced_custom_object
return self.get_namespaced_custom_object_with_http_info(group, version, namespace, plural, name, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/custom_objects_api.py", line 1605, in get_namespaced_custom_object_with_http_info
collection_formats=collection_formats)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
headers=headers)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 244, in GET
query_params=query_params)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '41bcdf27-da7e-453b-a344-0fa82807af51', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'ef02cbb8-3a78-4692-86aa-30128c75e9f8', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'c8942bd1-20bc-4c28-b1b9-400500c532fc', 'Date': 'Fri, 05 May 2023 13:24:16 GMT', 'Content-Length': '366'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io \"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06\" not found","reason":"NotFound","details":{"name":"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06","group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":404}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/sensors/base.py", line 199, in execute
poke_return = self.poke(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py", line 111, in poke
namespace=self.namespace,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 320, in get_custom_object
raise AirflowException(f"Exception when calling -> get_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> get_custom_object: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '41bcdf27-da7e-453b-a344-0fa82807af51', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'ef02cbb8-3a78-4692-86aa-30128c75e9f8', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'c8942bd1-20bc-4c28-b1b9-400500c532fc', 'Date': 'Fri, 05 May 2023 13:24:16 GMT', 'Content-Length': '366'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io \"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06\" not found","reason":"NotFound","details":{"name":"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06","group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":404}
[2023-05-05, 13:24:16 UTC] {taskinstance.py:1323} INFO - Marking task as FAILED. dag_id=pipeline_best_choices, task_id=sensor_perfil_verificado_promo, execution_date=20230504T060000, start_date=20230505T132415, end_date=20230505T132416
[2023-05-05, 13:24:16 UTC] {standard_task_runner.py:105} ERROR - Failed to execute job 1280973 for task sensor_perfil_verificado_promo (Exception when calling -> get_custom_object: (404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': '41bcdf27-da7e-453b-a344-0fa82807af51', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': 'ef02cbb8-3a78-4692-86aa-30128c75e9f8', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'c8942bd1-20bc-4c28-b1b9-400500c532fc', 'Date': 'Fri, 05 May 2023 13:24:16 GMT', 'Content-Length': '366'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io \"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06\" not found","reason":"NotFound","details":{"name":"dts-best-choices-perfil-verificado-promo-2023-05-05-12-42-06","group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":404}
; 20)
[2023-05-05, 13:24:16 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-05-05, 13:24:16 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check
1条答案
按热度按时间pengsaosao1#
同样的问题,有没有人想出一个好的解决办法?现在,我只是增加
max_active_runs
和max_active_tasks
来提高SparkKubernetesSensor在其死亡之前“捕获”其匹配的SparkKubernetesOperator的机会,但这是一个非常糟糕的架构来防止这种竞争条件