通过Airflow调度Kubernetes上运行的Spark作业

cig3rfwq  于 2023-04-29  发布在  Kubernetes
关注(0)|答案(2)|浏览(353)

我有一个通过Kubernetes pod运行的spark job。到目前为止,我一直使用Yaml文件手动运行我的作业。现在,我想通过气流安排我的Spark作业。这是我第一次使用气流,我无法弄清楚如何在气流中添加我的Yaml文件。从我所读到的是,我可以安排我的工作,通过一个DAG在气流。一个dag示例是:

from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)

def print_text1():
    print("hell-world1")

def print_text():
    print('Hello-World2')

t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2

在这种情况下,一旦我玩DAG,上述方法将在其他方法之后执行。现在,如果我想运行一个spark提交作业,我应该怎么做?我用的是Spark 2。4.4

vmdwslir

vmdwslir1#

Airflow有一个operators的概念,表示Airflow任务。在您的示例中使用了PythonOperator,它只是执行Python代码,并且很可能不是您感兴趣的代码,除非您在Python代码中提交Spark作业。您可以使用以下几种操作符:

  • BashOperator,它为您执行给定的bash脚本。您可以直接使用它运行kubectlspark-submit
  • SparkSubmitOperator,调用spark-submit的特定运算符
  • KubernetesPodOperator,为您创建Kubernetes pod,您可以使用它直接启动您的Driver pod
  • 混合解决方案,例如。HttpOperator + Livy on Kubernetes,您可以在Kubernetes上启动Livy服务器,该服务器用作Spark Job Server,并提供由Airflow HttpOperator调用的REST API

注意:对于每个操作员,您需要确保您的Airflow环境包含执行所需的所有依赖项以及配置为访问所需服务的凭据。
您也可以参考现有的线程:

t40tm48m

t40tm48m2#

截至2023年,我们有了新的选项,可以使用“SparkKubernetesOperator”在kubernetes上运行spark job。https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
在气流中,我们可以使用“SparkKubernetesOperator”并在“中提供Spark作业详细信息。yaml”文件。YAML文件将创建驱动程序和执行器pod来运行spark作业
气流任务:

spark_operator = SparkKubernetesOperator(
    dag=spark_operator_test,
    task_id="spark_operator_task",
    application_file='spark_app_config.yaml',
    namespace="spark-apps",
    on_success_callback=spark_success_alert,
    on_failure_callback=spark_fail_alert
)

YAML文件示例:

# # Copyright 2017 Google LLC # # Licensed under the Apache License, Version 2.0 

(the "License");
# you may not use this file except in compliance with the License. # You may obtain a copy of the License at
# # https://www.apache.org/licenses/LICENSE-2.0
# # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi-1
  namespace: spark-apps
spec:
  type: Scala
  mode: cluster
  image: "apache/spark-py:v3.4.0"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0.jar"
  sparkVersion: "3.4.0"
  sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "/mnt/data/eventLogs"
    "spark.driver.log.persistToDfs.enabled": "true"
    "spark.driver.log.dfsDir": "/mnt/data/eventLogs"
  restartPolicy:
    type: Never
  volumes:
    - name: "data-volume"
      persistentVolumeClaim:
        claimName: pvc-bifrost-spark-data
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.4.0
    serviceAccount: spark-apps-spark
    # podSecurityContext:
      # fsGroup: 0
    volumeMounts:
      - name: "data-volume"
        mountPath: "/mnt/data"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.4.0
    deleteOnTermination: false
    # podSecurityContext:
      # fsGroup: 0
    volumeMounts:
      - name: "data-volume"
        mountPath: "/mnt/data"

相关问题