我有一个通过Kubernetes pod运行的spark job。到目前为止,我一直使用Yaml文件手动运行我的作业。现在,我想通过气流安排我的Spark作业。这是我第一次使用气流,我无法弄清楚如何在气流中添加我的Yaml文件。从我所读到的是,我可以安排我的工作,通过一个DAG在气流。一个dag示例是:
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
args = {'owner':'test', 'start_date' : datetime(2019, 4, 3), 'retries': 2, 'retry_delay': timedelta(minutes=1) }
dag = DAG('test_dag', default_args = args, catchup=False)
def print_text1():
print("hell-world1")
def print_text():
print('Hello-World2')
t1 = PythonOperator(task_id='multitask1', python_callable=print_text1, dag=dag)
t2 = PythonOperator(task_id='multitask2', python_callable=print_text, dag=dag)
t1 >> t2
在这种情况下,一旦我玩DAG,上述方法将在其他方法之后执行。现在,如果我想运行一个spark提交作业,我应该怎么做?我用的是Spark 2。4.4
2条答案
按热度按时间vmdwslir1#
Airflow有一个operators的概念,表示Airflow任务。在您的示例中使用了PythonOperator,它只是执行Python代码,并且很可能不是您感兴趣的代码,除非您在Python代码中提交Spark作业。您可以使用以下几种操作符:
kubectl
或spark-submit
spark-submit
的特定运算符注意:对于每个操作员,您需要确保您的Airflow环境包含执行所需的所有依赖项以及配置为访问所需服务的凭据。
您也可以参考现有的线程:
t40tm48m2#
截至2023年,我们有了新的选项,可以使用“SparkKubernetesOperator”在kubernetes上运行spark job。https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
在气流中,我们可以使用“SparkKubernetesOperator”并在“中提供Spark作业详细信息。yaml”文件。YAML文件将创建驱动程序和执行器pod来运行spark作业
气流任务:
YAML文件示例: