python-3.x 使用带参数的后请求触发气流DAG

uplii1fm  于 2022-11-19  发布在  Python
关注(0)|答案(1)|浏览(144)

我正在尝试触发Airflow的DAG并在发布请求中发送参数。
用法示例- DAG有一个任务,该任务仅打印在触发器请求内发送的数字。
触发dag的示例:

import base64
import boto3
import requests

MWAA_ENVIRONMENT_NAME = 'production-mwaa'
dag_name = 'test_param_dag'
mwaa_cli_command = 'dags trigger'

client = boto3.client('mwaa')

mwaa_cli_token = client.create_cli_token(Name=MWAA_ENVIRONMENT_NAME)

mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = f'https://{mwaa_cli_token["WebServerHostname"]}/aws_mwaa/cli'
raw_data = '{0} {1}'.format(mwaa_cli_command, dag_name)

mwaa_response = requests.post(url=mwaa_webserver_hostname,
                          headers=
                          {
                              'Authorization': mwaa_auth_token,
                              'Content-Type': 'text/plain'
                          },
                          params={"x": 11},
                          data=raw_data)

mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')

print(mwaa_response.status_code)
print('err:\n' + mwaa_std_err_message)
print('out:\n' + mwaa_std_out_message)

dag应该打印'x'参数的值--〉11。

zkure5ic

zkure5ic1#

您可以使用“{{ dag_run.conf ['x'] }}”访问dag中的参数'x'

相关问题