python-3.x MWAA触发器dag问题

fcg9iug3  于 2023-01-22  发布在  Python
关注(0)|答案(1)|浏览(135)

我正在使用MWAA,当我试图触发一个dag时,有时我会收到这个错误。我不确定该如何处理这个错误。

def should_retry(response) -> bool:
    """
    If `true`, then retry the Airflow job. Else, fail.
    """
    wait_between_s3_requests()
    response_data = decode_response_obj(response)
    print(response_data)
    if "externally triggered: True" in response_data["stdout"].decode("utf8") and response.status_code == 200:
        print("here")
        send_slack_message(f"here")
        return False
    if response is not None and response.status_code != 200:
        return True
    if response is not None and response.status_code == 200:
        response_data = decode_response_obj(response)
        if response_data["stderr"]:
            return True
        else:
            return False
    return False

def _trigger_dag(dag_name: str, *, conf: str, airflow_env: str) -> http.client.HTTPSConnection:
    """Trigger a DAG run in an Airflow environment."""
    mwaa_cli_token = boto3.client("mwaa").create_cli_token(Name=airflow_env)
    connection = http.client.HTTPSConnection(mwaa_cli_token["WebServe
Trigger dag  with error: b'/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:357 DeprecationWarning: The processor_poll_interval option in [scheduler] has been renamed to scheduler_idle_sleep_time - the old setting has been used, but please update your config.\nTraceback (most recent call last):\n  File "/usr/local/airflow/.local/bin/airflow", line 8, in <module>\n    sys.exit(main())\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main\n    args.func(args)\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command\n    return func(*args, **kwargs)\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper\n    return f(*args, **kwargs)\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/dag_command.py", line 138, in dag_trigger\n    dag_id=args.dag_id, run_id=args.run_id, conf=args.conf, execution_date=args.exec_date\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/client/local_client.py", line 30, in trigger_dag\n    dag_id=dag_id, run_id=run_id, conf=conf, execution_date=execution_date\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 125, in trigger_dag\n    replace_microseconds=replace_microseconds,\n  File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 75, in _trigger_dag\n    f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"\nairflow.exceptions.DagRunAlreadyExists: A Dag Run already exists for dag id ml_product at 2023-01-14 00:08:19+00:00 with run id manual__2023-01-14T00:08:19+00:00\n'
xtfmy6hx

xtfmy6hx1#

重新发布错误的相关部分以提高可读性:

File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", 
line 75, in _trigger_dag
airflow.exceptions.DagRunAlreadyExists: 
A Dag Run already exists for dag id ml_product at 2023-01-14 00:08:19+00:00
with run id manual__2023-01-14T00:08:19+00:00

这意味着你正在尝试重新运行一个与已经运行的dag具有完全相同时间戳的dag,你切断了你的代码片段,但我假设它会继续这样的内容:

client = boto3.client('mwaa')
    token = client.create_cli_token(Name='your-new-environment-name')
    url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
    body = 'dags trigger your-target-dag-id'
    headers = {
        'Authorization' : 'Bearer ' + token['CliToken'],
        'Content-Type': 'text/plain'
        }
    requests.post(url, data=body, headers=headers)

在这种情况下,我会考虑向body添加一个执行日期,即当前系统时间(并确保函数不会在同一时间运行两次)
也可参见气流扰动触发命令的参考。

相关问题