如何在sparkkubernetesoperator操作符中将执行日期作为参数传递?

1wnzp6jl  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(422)

我在想办法把执行日期传给SparkUberneteSoperator。任何方法都可以传递它,因为我将使用spark运行和s3分区的执行日期。

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",
        application_file="/k8s/compaction_s3.yml",
        namespace=kubernetes_namespace,
        kubernetes_conn_id="kubernetes",
        params={
            "warehouse_path": s3_path,
            "snapshot_expire_time": execution_date,
            "partition_filter": execution_date,
            "k8s_namespace": kubernetes_namespace,
            "docker_image_tag": docker_image_tag,
        }
m2xkgtsf

m2xkgtsf1#

不幸的是, params 只向jinja公开自定义值,但不呈现其中的jinja模板。
例如,让我们看看这个Python。

op = PythonOperator(
    task_id="my_operator",
    python_callable=lambda**context: print(context['params']),
    params={
        "date": "{{ execution_date }}"
    },
    dag=dag
)

日期键的值是文本字符串 "{{ execution_date }}" 而不是渲染值。

[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}

baseoperator中的params hook允许您将参数和/或对象的字典传递给模板。请花点时间了解参数my_param是如何传递到模板的。
您可以在airflow文档中阅读更多关于jinja使用params模板的信息。
可以使用 execution_date 在其他方面,sparkkubernetesoperator利用这些设置的jinja模板。

template_fields = ['application_file', 'namespace']  
template_ext = ('yaml', 'yml', 'json')

sparkkubernetesoperator有两个模板字段, application_file 以及 namespace ,这意味着您可以使用jinja模板作为值。如果引用具有这些扩展名的文件,它将呈现该文件及其内部的jinja模板。
让我们修改您提供的运算符。

submit_compaction_to_spark = SparkKubernetesOperator(
        task_id="submit_compaction_to_spark",
        application_file="/k8s/compaction_s3.yml",
        namespace=kubernetes_namespace,
        kubernetes_conn_id="kubernetes",
        params={
            "k8s_namespace": kubernetes_namespace,
            "warehouse_path": s3_path,
        }
)

我猜怎么着 /k8s/compaction_s3.yml 看起来像,添加了一些jinja模板。

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
  namespace: "{{ params.k8s_namespace }}"
  labels:
    warehouse_path: "{{ params.k8s_namespace }}"
    date: "{{ ds }}"
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v2.4.4"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
  sparkVersion: "2.4.4"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 2.4.4
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.4
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

可以在dag中检查任务示例的“渲染模板”视图。
另请参考气流文档中的示例dag和示例应用程序文件。

相关问题