我正在使用MWAA,当我试图触发一个dag时,有时我会收到这个错误。我不确定该如何处理这个错误。
def should_retry(response) -> bool:
"""
If `true`, then retry the Airflow job. Else, fail.
"""
wait_between_s3_requests()
response_data = decode_response_obj(response)
print(response_data)
if "externally triggered: True" in response_data["stdout"].decode("utf8") and response.status_code == 200:
print("here")
send_slack_message(f"here")
return False
if response is not None and response.status_code != 200:
return True
if response is not None and response.status_code == 200:
response_data = decode_response_obj(response)
if response_data["stderr"]:
return True
else:
return False
return False
def _trigger_dag(dag_name: str, *, conf: str, airflow_env: str) -> http.client.HTTPSConnection:
"""Trigger a DAG run in an Airflow environment."""
mwaa_cli_token = boto3.client("mwaa").create_cli_token(Name=airflow_env)
connection = http.client.HTTPSConnection(mwaa_cli_token["WebServe
Trigger dag with error: b'/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:357 DeprecationWarning: The processor_poll_interval option in [scheduler] has been renamed to scheduler_idle_sleep_time - the old setting has been used, but please update your config.\nTraceback (most recent call last):\n File "/usr/local/airflow/.local/bin/airflow", line 8, in <module>\n sys.exit(main())\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 48, in main\n args.func(args)\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command\n return func(*args, **kwargs)\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 92, in wrapper\n return f(*args, **kwargs)\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/dag_command.py", line 138, in dag_trigger\n dag_id=args.dag_id, run_id=args.run_id, conf=args.conf, execution_date=args.exec_date\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/client/local_client.py", line 30, in trigger_dag\n dag_id=dag_id, run_id=run_id, conf=conf, execution_date=execution_date\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 125, in trigger_dag\n replace_microseconds=replace_microseconds,\n File "/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/api/common/experimental/trigger_dag.py", line 75, in _trigger_dag\n f"A Dag Run already exists for dag id {dag_id} at {execution_date} with run id {run_id}"\nairflow.exceptions.DagRunAlreadyExists: A Dag Run already exists for dag id ml_product at 2023-01-14 00:08:19+00:00 with run id manual__2023-01-14T00:08:19+00:00\n'
1条答案
按热度按时间xtfmy6hx1#
重新发布错误的相关部分以提高可读性:
这意味着你正在尝试重新运行一个与已经运行的dag具有完全相同时间戳的dag,你切断了你的代码片段,但我假设它会继续这样的内容:
在这种情况下,我会考虑向
body
添加一个执行日期,即当前系统时间(并确保函数不会在同一时间运行两次)也可参见气流扰动触发命令的参考。