shell 如何在Airflow中设置操作系统环境变量

roejwanj  于 2023-03-13  发布在  Shell
关注(0)|答案(2)|浏览(224)

我有一个airflow DAG,我正在尝试读取存储在airflow UI中的变量(用户名和密码),并将这些变量值作为导出值传递到操作系统中。原因是我正在使用dbt yaml文件,该文件要求我读取环境变量dbt_user。(唯一的其他方法是在yaml文件中设置密码,这不安全。

default:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: xxxx
      user: "{{ env_var('dbt_user') }}"

我试着写一个dag来导出bashoperator,但是它似乎没有设置环境变量。

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}

with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    task_1 = BashOperator(
        task_id='get_variables',
        bash_command='export dbt_user={{ var.value.dbt_user }} ',
        env = os.environ.copy(),
        dag=dag
    )

    task_2 = BashOperator(
        task_id='load_seed_data_once',
        bash_command='echo $dbt_user',
        dag=dag
    )

task_1 >> task_2

当我尝试回显时,我们可以看到没有设置任何内容。2有人知道如何使用bashoperator设置环境变量吗?

[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO - 
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0

更新:
我也试过通过python操作符来做,但效果不太好。它给了我一个从无KeyError的提升KeyError(key):'变量_1'

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020,8,1),
    'retries': 0
}

def set_env():
    os.environ["variable_1"] = "value_1"

def print_env_var():
    print(os.environ["variable_1"])

with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    set_env_task = PythonOperator(
        task_id='python_task', 
        python_callable=set_env,
        dag=dag
    )

    print_env_task = PythonOperator(
        task_id='load_seed_data_once',
        python_callable=print_env_var,
        dag=dag
    )

set_env_task >> print_env_task
hpcdzsge

hpcdzsge1#

BashOperatorPythonOperator--我认为是任何操作符--启动一个新的子shell,并且只继承映像中的容器中设置的环境,在运行时(例如composek8s deploy)或在启动airflow之前通过脚本(例如入口点)。
这就是为什么你在BashOperator中有env参数来传递你想为脚本设置的任何env变量的dict,你也可以在模板化env时从AF变量传递dbt_user和password。

env={'dbt_user': '{{ var.value.dbt_user }}'}

您也可以在dag默认值中设置env,使其对所有任务可用,这样您就不需要单独设置。
最后,如果使用LocalExecutor,可以在第一次bash中执行以下操作:

echo "export dbt_user={{ var.value.dbt_user }} >> ~/.bashrc

这将使导出的变量可以在任何新的shell中访问。注意,当KubernetesExecutor启动一个新的容器时,这将不起作用-但是有一些方法可以解决这个问题。

vq8itlhq

vq8itlhq2#

再加上@Yannick的回答,由于每个操作符通常都在自己的环境中运行,因此必须相应地设置环境。在OP的情况下,即运行dbt,这将取决于dbt是如何执行的,即DockerOperatorKubernetesPodOperadorBashOperatorPythonOperator等等。
这些操作符中的大多数都有一个类似于env的参数,可用于在执行期间将环境变量导出到运行时。以下代码片段提供了如何实现此操作的示例:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client.models import V1EnvVar
from datetime import datetime
import os

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 0
}

def python_function(MY_VAR):
    os.environ['MY_VAR'] = MY_VAR
    # do something which requires env var set
    ...

with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
    k8s_task = KubernetesPodOperator(
        task_id='python_task',
        env_vars=[V1EnvVar(name='MY_VAR', value='value')],
        ...
    )

    docker_task = DockerOperator(
        task_id='docker_task',
        image="alpine",
        ...
        environment={'MY_VAR': 'value'},
    )

    bash_task = BashOperator(
        task_id='bash_task',
        env={'MY_VAR': 'value'},
        bash_command="echo $MY_VAR",
    )

    python_task = PythonOperator(
        task_id='python_task',
        op_kwargs={'MY_VAR': 'value'},
        python_callable=python_function,
    )

但是,对于PythonOperator,不可能通过operator参数设置环境变量,因此,在上面的示例中,使用了一种变通方法来使用op_kwargsos设置环境。

相关问题