我有一个DAG,它执行存储在JSON文件中的多个命令(其中大部分是使用参数运行的Python脚本),DAG的结构类似于此:
def execute_command_and_store_logs_in_mongodb():
blablabla
def load_commands(ti, **kwargs):
with open('./data.json') as f:
commands = json.load(f)
# Maximum size -> 48kb
ti.xcom_push(key='data', value=commands)
with DAG(
'my_dag',
default_args=default_args,
schedule_interval='0 6 * * *',
start_date=pendulum.yesterday(),
description=("My DAG")
) as dag:
load_commands_task = PythonOperator(
task_id='load_json',
python_callable=load_commands,
provide_context=True,
dag=dag
)
execute_commands_task = PythonOperator(
task_id='run_python_scripts',
python_callable=execute_command_and_store_logs_in_mongodb,
provide_context=True,
dag=dag
)
load_commands_task >> execute_commands_task
问题出在“execute_commands_task”中,它调用了execute_command_and_store_logs_in_mongodb函数,该函数是:
def execute_command_and_store_logs_in_mongodb(ti, **kwargs):
"""
"""
# Loop through the commands and trigger the BashOperators
commands = ti.xcom_pull(task_ids='load_json', key='data')
for command in commands:
task_id = f'id_{command}' # don't pay attention to this id
# Execute the command
bash_op = BashOperator(
task_id=task_id,
bash_command='python {command}',
retries=3,
retry_delay=timedelta(seconds=5),
dag=dag
)
bash_op.execute(context=kwargs)
# HERE, i want to print the output of each command and assign to a variable (to store in a DB later, send via email), it helps me to know if any python script raised an exception, etc
我该怎么做呢?我用的是Airflow 2.4.2,谢谢
我尝试了bash_op.log,尝试了xpull,但它们都不起作用
1条答案
按热度按时间lo8azlld1#
也许你可以在这里使用subprocess来代替。