目标:
- 在任务之间使用python列表作为全局变量。
- 目前,它在第一个任务时崩溃。
- 1.)我尝试创建一个简单的python列表,它可以从一个任务传递到下一个任务,并在任务2中向其追加一些字符串值。因此,目标是创建一个共享列表。
- 2.)即使一个任务失败,它应该只是移动ad dotn小心(显然标记任务区域失败)
设置:
- 我正在使用气流2.4.1
- 我使用Airflow Docker并构建了一个Python环境,我已经使用过很多次了,并且运行良好。
我的代码:
from __future__ import annotations
import logging
import os
import shutil
import sys
import tempfile
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)
PYTHON = sys.executable
BASE_DIR = tempfile.gettempdir()
my_default_args = {
'owner': 'me',
'email': ['some_email@some_email.com'],
'email_on_failure': True,
'email_on_retry': False,
'write_successes': [],
}
with DAG(
dag_id='my_dag_id',
schedule='9 9 * * *',
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['a', 'b'],
) as dag:
@task.external_python(task_id="one", python='/opt/airflow/venv1/bin/python3')
def first(**kwargs):
task_id="one"
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(99)
print(write_successes)
@task.external_python(task_id="two", python='/opt/airflow/venv1/bin/python3')
def second(**kwargs):
write_successes = kwargs.get('write_successes', [])
print(write_successes)
write_successes.append(101)
print(write_successes)
one = first()
two = second()
one >> two
错误:
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=scheduled__2023-02-05T09:09:00+00:00/task_id=one/attempt=1.log
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [queued]>
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): one> on 2023-02-05 09:09:00+00:00
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:54} INFO - Started process 239657 to run task
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'one', 'scheduled__2023-02-05T09:09:00+00:00', '--job-id', '72751', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpxldmrzpp']
[2023-02-06, 12:24:43 GMT] {standard_task_runner.py:83} INFO - Job 72751: Subtask one
[2023-02-06, 12:24:43 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 12:24:43 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.one scheduled__2023-02-05T09:09:00+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 12:24:43 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=me
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=one
AIRFLOW_CTX_EXECUTION_DATE=2023-02-05T09:09:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2023-02-05T09:09:00+00:00
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_start' or 'logical_date' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_ds_nodash' from the template is deprecated and will be removed in a future version. Please use '{{ data_interval_end | ds_nodash }}' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'next_execution_date' from the template is deprecated and will be removed in a future version. Please use 'data_interval_end' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'prev_execution_date_success' from the template is deprecated and will be removed in a future version. Please use 'prev_data_interval_start_success' instead.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'tomorrow_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {warnings.py:109} WARNING - /home/airflow/.local/lib/python3.8/site-packages/airflow/utils/context.py:204: AirflowContextDeprecationWarning: Accessing 'yesterday_ds_nodash' from the template is deprecated and will be removed in a future version.
warnings.warn(_create_deprecation_warning(key, self._deprecation_replacements[key]))
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 411, in _execute_python_callable_in_subprocess
self._write_args(input_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 381, in _write_args
file.write_bytes(self.pickling_library.dumps({'args': self.op_args, 'kwargs': self.op_kwargs}))
_pickle.PicklingError: Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first
[2023-02-06, 12:24:44 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=one, execution_date=20230205T090900, start_date=20230206T122443, end_date=20230206T122444
[2023-02-06, 12:24:44 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 72751 for task one (Can't pickle <function first at 0x7f80ff76e4c0>: it's not the same object as unusual_prefix_6cc7442bed7c02593e3a29524b0e65329d9f59da_test_global_variable.first; 239657)
[2023-02-06, 12:24:44 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-06, 12:24:44 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
我已经尝试根据以下帖子修复它:
- 我尝试过全局python变量,但根本不起作用
- Global variables in Airflow-我有单独的“task.external_python”,这使得它不可能使用下面的帖子。
- 我的不是班级问题-List as global variable inside a class in Python
- 我可能会感兴趣,但是我为每个任务都提供了单独的python venve-https://stackoverflow.com/a/58804409/10270590
- 我无法使Airflow XCOM工作
- @TJaniF -〉(我已经重试了第二次,但在第一次运行相同的代码时,我得到了以下结果:)我尝试了以下代码,顶部的长条标记为失败,但一个方形波纹管标记为成功,但该方形下方根本没有方形。我不明白这一点
from airflow.decorators import dag, task
from pendulum import datetime
@dag(
dag_id='test_global_variable',
start_date=datetime(2022,12,10),
schedule=None,
catchup=False,)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/venv1/bin/python3')
def add_to_list(my_list):
print(my_list)
my_list.append(19)
return my_list
@task.external_python(task_id="task_2", python='/opt/airflow/venv1/bin/python3')
def add_to_list_2(my_list):
print(my_list)
my_list.append(42)
return my_list
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
从成功的任务记录
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [queued]>
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_1> on 2023-02-06 15:36:51.225176+00:00
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:54} INFO - Started process 249785 to run task
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_1', 'manual__2023-02-06T15:36:51.225176+00:00', '--job-id', '72908', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpuw6bfiif']
[2023-02-06, 15:36:52 GMT] {standard_task_runner.py:83} INFO - Job 72908: Subtask task_1
[2023-02-06, 15:36:52 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-06, 15:36:52 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_1 manual__2023-02-06T15:36:51.225176+00:00 [running]> on host 4851b30aa5cf
[2023-02-06, 15:36:52 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_1
AIRFLOW_CTX_EXECUTION_DATE=2023-02-06T15:36:51.225176+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-06T15:36:51.225176+00:00
[2023-02-06, 15:36:53 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmd35abbbcv/script.py /tmp/tmd35abbbcv/script.in /tmp/tmd35abbbcv/script.out /tmp/tmd35abbbcv/string_args.txt
[2023-02-06, 15:36:53 GMT] {process_utils.py:183} INFO - Output:
[2023-02-06, 15:36:54 GMT] {process_utils.py:187} INFO - [23, 5, 8]
[2023-02-06, 15:36:54 GMT] {python.py:177} INFO - Done. Returned value was: [23, 5, 8, 19]
[2023-02-06, 15:36:54 GMT] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=test_global_variable, task_id=task_1, execution_date=20230206T153651, start_date=20230206T153652, end_date=20230206T153654
[2023-02-06, 15:36:54 GMT] {local_task_job.py:164} INFO - Task exited with return code 0
[2023-02-06, 15:36:54 GMT] {local_task_job.py:273} INFO - 1 downstream tasks scheduled from follow-on schedule check
1条答案
按热度按时间nkoocmlb1#
我很好奇你对Airflow XCom做了什么尝试?下面的DAG通过TaskFlow API使用XCom将一个列表从一个任务传递到另一个任务。在Airflow 2.5.1中测试过,但在2.4.1中应该是一样的。
截图: