我正在尝试触发Airflow的DAG并在发布请求中发送参数。
用法示例- DAG有一个任务,该任务仅打印在触发器请求内发送的数字。
触发dag的示例:
import base64
import boto3
import requests
MWAA_ENVIRONMENT_NAME = 'production-mwaa'
dag_name = 'test_param_dag'
mwaa_cli_command = 'dags trigger'
client = boto3.client('mwaa')
mwaa_cli_token = client.create_cli_token(Name=MWAA_ENVIRONMENT_NAME)
mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken']
mwaa_webserver_hostname = f'https://{mwaa_cli_token["WebServerHostname"]}/aws_mwaa/cli'
raw_data = '{0} {1}'.format(mwaa_cli_command, dag_name)
mwaa_response = requests.post(url=mwaa_webserver_hostname,
headers=
{
'Authorization': mwaa_auth_token,
'Content-Type': 'text/plain'
},
params={"x": 11},
data=raw_data)
mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8')
mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8')
print(mwaa_response.status_code)
print('err:\n' + mwaa_std_err_message)
print('out:\n' + mwaa_std_out_message)
dag应该打印'x'参数的值--〉11。
1条答案
按热度按时间zkure5ic1#
您可以使用“{{ dag_run.conf ['x'] }}”访问dag中的参数'x'