json 如何在Airflow中打印和存储BashCommand输出?

wydwbb8l  于 2023-03-24  发布在  其他
关注(0)|答案(1)|浏览(127)

我有一个DAG,它执行存储在JSON文件中的多个命令(其中大部分是使用参数运行的Python脚本),DAG的结构类似于此:

def execute_command_and_store_logs_in_mongodb():
   blablabla

def load_commands(ti, **kwargs):
    with open('./data.json') as f:
        commands = json.load(f)
    # Maximum size -> 48kb
    ti.xcom_push(key='data', value=commands)

with DAG(
    'my_dag',
    default_args=default_args,
    schedule_interval='0 6 * * *',
    start_date=pendulum.yesterday(),
    description=("My DAG")
    ) as dag:

        load_commands_task = PythonOperator(
            task_id='load_json',
            python_callable=load_commands,
            provide_context=True,
            dag=dag
        )

        execute_commands_task = PythonOperator(
            task_id='run_python_scripts',
            python_callable=execute_command_and_store_logs_in_mongodb,
            provide_context=True,
            dag=dag
        )
        load_commands_task >> execute_commands_task

问题出在“execute_commands_task”中,它调用了execute_command_and_store_logs_in_mongodb函数,该函数是:

def execute_command_and_store_logs_in_mongodb(ti, **kwargs):
    """
    """
    # Loop through the commands and trigger the BashOperators
    commands = ti.xcom_pull(task_ids='load_json', key='data')
    for command in commands:
        task_id = f'id_{command}' # don't pay attention to this id
        # Execute the command
        bash_op = BashOperator(
            task_id=task_id,
            bash_command='python {command}',
            retries=3,
            retry_delay=timedelta(seconds=5),
            dag=dag
        )
        bash_op.execute(context=kwargs)
        # HERE, i want to print the output of each command and assign to a variable (to store in a DB      later, send via email), it helps me to know if any python script raised an exception, etc

我该怎么做呢?我用的是Airflow 2.4.2,谢谢
我尝试了bash_op.log,尝试了xpull,但它们都不起作用

lo8azlld

lo8azlld1#

也许你可以在这里使用subprocess来代替。

import subprocess

result = subprocess.run(“python command here”, shell=True, capture_output=True)
print(result.stdout)
print(result.stderr)

相关问题