DBT Airflow DAG JSON使用Kubernetes操作符覆盖

9rnv2umw  于 2023-11-17  发布在  Kubernetes
关注(0)|答案(1)|浏览(200)

创建了一个DAG来访问覆盖。下面是在覆盖中传递的命令。JSON on Airflow UI : {"dbt_command": ["products", "--full-refresh"]}
这些命令是可访问的,但它的格式是列表字符串。我如何转换或删除类型字符串和列表方括号在“产品”和“--full-refresh?

风量日志输出:

['dbt', 'run', '--select', "['products', '--full-refresh']", '--target', 'stage']

预期输出:

['dbt', 'run', '--select', 'products', '--full-refresh', '--target', 'stage']

原编码:

Dbt_overrides_Reporting_Task = KubernetesPodOperator(
    namespace="airflow",
    service_account_name="airflow",
    task_id="overrides",
    image="dbt-snowflake:latest",
    cmds = ["dbt", "run", "--select", "{{dag_run.conf.get('dbt_command', '') if dag_run}}" , "--target", "stage"],
    provide_context=True,
    dag_run_conf_overrides_params=True,
    dag = dag,
    is_delete_operator_pod=True,
    name=f"{DAG_NAME}.dag",
    secrets=[keys_env],
    configmaps=[f"{CHART_NAME}-configmap"],
    in_cluster=True,
    get_logs=True,
    env_vars={
        "ENV": os.environ["ENV"],
    },
)

字符串
有没有人可以帮助我解决这个问题?我如何在kubernetes operator中转换cmds中的dag_run.conf jinja模板?

smdncfj3

smdncfj31#

cmds = ["dbt", "run", "--select", "{{dag_run.conf.get('dbt_command_part1', '') if dag_run}}", "{{dag_run.conf.get('dbt_command_part2', '') if dag_run}}" , "--target", "stage"],

字符串
并提供{“dbt_command_part1”:“产品”,“dbt_command_part2”:“--full-refresh”]}
应该能解决你的问题

相关问题