目标
- 我使用Docker 2.4.1版本的气流
- 我使用我的外部python虚拟环境来执行每个任务
- 我有一个Pandas数据框架,我想在任务之间传递。
- 我之前的问题How to use a python list as global variable python list with in @task.external_python?通过python列表成功完成,但当我切换到panda Dataframe 时,进程崩溃
- 第一个任务成功运行
编号
from airflow.decorators import dag, task
from pendulum import datetime
from datetime import timedelta
my_default_args = {
'owner': 'Anonymus',
# 'email': ['random@random.com'],
# 'email_on_failure': True,
# 'email_on_retry': False, #only allow if it was allowed in the scheduler
# 'retries': 1, #only allow if it was allowed in the scheduler
# 'retry_delay': timedelta(minutes=1),
# 'depends_on_past': False,
}
@dag(
dag_id='test_global_variable',
schedule='12 11 * * *',
start_date=datetime(2023,2,1,tz="UTC"),
catchup=False,
default_args=my_default_args,
tags=['sample_tag', 'sample_tag2'],
)
def write_var():
@task.external_python(task_id="task_1", python='/opt/airflow/v1/bin/python3')
def add_to_list(my_list):
print(my_list)
import pandas as pd
df = pd.DataFrame(my_list)
return df
@task.external_python(task_id="task_2", python='/opt/airflow/v1/bin/python3')
def add_to_list_2(df):
print(df)
df = df.append([19])
return df
add_to_list_2(add_to_list([23, 5, 8]))
write_var()
第二个任务的错误日志
*** Reading local file: /opt/airflow/logs/dag_id=test_global_variable/run_id=manual__2023-02-07T14:06:17.432734+00:00/task_id=task_2/attempt=1.log
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1165} INFO - Dependencies all met for <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [queued]>
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1362} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1363} INFO - Starting attempt 1 of 1
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1364} INFO -
--------------------------------------------------------------------------------
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1383} INFO - Executing <Task(_PythonExternalDecoratedOperator): task_2> on 2023-02-07 14:06:17.432734+00:00
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:54} INFO - Started process 324831 to run task
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_global_variable', 'task_2', 'manual__2023-02-07T14:06:17.432734+00:00', '--job-id', '74080', '--raw', '--subdir', 'DAGS_FOLDER/test_global_variable.py', '--cfg-path', '/tmp/tmpbm8tkk1i']
[2023-02-07, 14:06:22 GMT] {standard_task_runner.py:83} INFO - Job 74080: Subtask task_2
[2023-02-07, 14:06:22 GMT] {dagbag.py:525} INFO - Filling up the DagBag from /opt/airflow/dags/test_global_variable.py
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_1>, task_2 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {taskmixin.py:205} WARNING - Dependency <Task(_PythonExternalDecoratedOperator): task_2>, task_1 already registered for DAG: test_global_variable
[2023-02-07, 14:06:22 GMT] {task_command.py:384} INFO - Running <TaskInstance: test_global_variable.task_2 manual__2023-02-07T14:06:17.432734+00:00 [running]> on host 4851b30aa5cf
[2023-02-07, 14:06:22 GMT] {taskinstance.py:1590} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Anonymus
AIRFLOW_CTX_DAG_ID=test_global_variable
AIRFLOW_CTX_TASK_ID=task_2
AIRFLOW_CTX_EXECUTION_DATE=2023-02-07T14:06:17.432734+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-02-07T14:06:17.432734+00:00
[2023-02-07, 14:06:23 GMT] {process_utils.py:179} INFO - Executing cmd: /opt/airflow/venv1/bin/python3 /tmp/tmddiox599m/script.py /tmp/tmddiox599m/script.in /tmp/tmddiox599m/script.out /tmp/tmddiox599m/string_args.txt
[2023-02-07, 14:06:23 GMT] {process_utils.py:183} INFO - Output:
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - File "/tmp/tmddiox599m/script.py", line 17, in <module>
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - arg_dict = pickle.load(file)
[2023-02-07, 14:06:24 GMT] {process_utils.py:187} INFO - AttributeError: Can't get attribute '_unpickle_block' on <module 'pandas._libs.internals' from '/opt/airflow/venv1/lib/python3.8/site-packages/pandas/_libs/internals.cpython-38-x86_64-linux-gnu.so'>
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 370, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 678, in execute_callable
return self._execute_python_callable_in_subprocess(python_path, tmp_path)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 426, in _execute_python_callable_in_subprocess
execute_in_subprocess(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 168, in execute_in_subprocess
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 191, in execute_in_subprocess_with_kwargs
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.
[2023-02-07, 14:06:24 GMT] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=test_global_variable, task_id=task_2, execution_date=20230207T140617, start_date=20230207T140622, end_date=20230207T140624
[2023-02-07, 14:06:24 GMT] {standard_task_runner.py:102} ERROR - Failed to execute job 74080 for task task_2 (Command '['/opt/airflow/venv1/bin/python3', '/tmp/tmddiox599m/script.py', '/tmp/tmddiox599m/script.in', '/tmp/tmddiox599m/script.out', '/tmp/tmddiox599m/string_args.txt']' returned non-zero exit status 1.; 324831)
[2023-02-07, 14:06:24 GMT] {local_task_job.py:164} INFO - Task exited with return code 1
[2023-02-07, 14:06:24 GMT] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check
1条答案
按热度按时间yzxexxkh1#
尝试使用
to_dict
将 Dataframe 序列化为dict,然后从其他任务重新创建它:来自ExternalPythonOperator(和VirtualPythonOperator)有关序列化的Airflow文档。
你的python可调用对象必须是可序列化的。有许多python对象不能使用标准的
pickle
库序列化。你可以通过使用dill
库来减轻其中的一些限制,但是即使是dill
库也不能解决所有的序列化限制。