我有一个airflow DAG,我正在尝试读取存储在airflow UI中的变量(用户名和密码),并将这些变量值作为导出值传递到操作系统中。原因是我正在使用dbt yaml文件,该文件要求我读取环境变量dbt_user
。(唯一的其他方法是在yaml文件中设置密码,这不安全。
default:
target: dev
outputs:
dev:
type: snowflake
account: xxxx
user: "{{ env_var('dbt_user') }}"
我试着写一个dag来导出bashoperator,但是它似乎没有设置环境变量。
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
task_1 = BashOperator(
task_id='get_variables',
bash_command='export dbt_user={{ var.value.dbt_user }} ',
env = os.environ.copy(),
dag=dag
)
task_2 = BashOperator(
task_id='load_seed_data_once',
bash_command='echo $dbt_user',
dag=dag
)
task_1 >> task_2
当我尝试回显时,我们可以看到没有设置任何内容。2有人知道如何使用bashoperator设置环境变量吗?
[2021-11-04 12:00:34,452] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo $dbt_user']
[2021-11-04 12:00:34,463] {subprocess.py:74} INFO - Output:
[2021-11-04 12:00:34,464] {subprocess.py:78} INFO -
[2021-11-04 12:00:34,465] {subprocess.py:82} INFO - Command exited with return code 0
[2021-11-04 12:00:34,494] {taskinstance.py:1211} INFO - Marking task as SUCCESS. dag_id=sample, task_id=load_seed_data_once, execution_date=20211104T120032, start_date=20211104T120034, end_date=20211104T120034
[2021-11-04 12:00:34,517] {taskinstance.py:1265} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-11-04 12:00:34,555] {local_task_job.py:149} INFO - Task exited with return code 0
更新:
我也试过通过python操作符来做,但效果不太好。它给了我一个从无KeyError的提升KeyError(key):'变量_1'
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.models import Variable
import os
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020,8,1),
'retries': 0
}
def set_env():
os.environ["variable_1"] = "value_1"
def print_env_var():
print(os.environ["variable_1"])
with DAG('sample', default_args=default_args, schedule_interval='@once') as dag:
set_env_task = PythonOperator(
task_id='python_task',
python_callable=set_env,
dag=dag
)
print_env_task = PythonOperator(
task_id='load_seed_data_once',
python_callable=print_env_var,
dag=dag
)
set_env_task >> print_env_task
2条答案
按热度按时间hpcdzsge1#
BashOperator
和PythonOperator
--我认为是任何操作符--启动一个新的子shell,并且只继承映像中的容器中设置的环境,在运行时(例如compose
或k8s deploy
)或在启动airflow
之前通过脚本(例如入口点)。这就是为什么你在
BashOperator
中有env
参数来传递你想为脚本设置的任何env变量的dict,你也可以在模板化env
时从AF变量传递dbt_user
和password。您也可以在dag默认值中设置
env
,使其对所有任务可用,这样您就不需要单独设置。最后,如果使用
LocalExecutor
,可以在第一次bash中执行以下操作:这将使导出的变量可以在任何新的shell中访问。注意,当
KubernetesExecutor
启动一个新的容器时,这将不起作用-但是有一些方法可以解决这个问题。vq8itlhq2#
再加上@Yannick的回答,由于每个操作符通常都在自己的环境中运行,因此必须相应地设置环境。在OP的情况下,即运行
dbt
,这将取决于dbt
是如何执行的,即DockerOperator
、KubernetesPodOperador
、BashOperator
或PythonOperator
等等。这些操作符中的大多数都有一个类似于
env
的参数,可用于在执行期间将环境变量导出到运行时。以下代码片段提供了如何实现此操作的示例:但是,对于
PythonOperator
,不可能通过operator参数设置环境变量,因此,在上面的示例中,使用了一种变通方法来使用op_kwargs
和os
设置环境。