我正在使用Airflow 2.5.3与Kubernetes executor和Python 3.7。
我尝试创建一个简单的DAG,其中只传递了一个PythonVirtualnvOperator
和两个上下文变量({{ ts }}
和{{ dag }}
)。
from datetime import timedelta
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import pendulum
dag = DAG(
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
dag_id='fs_rb_cashflow_test5',
schedule_interval='0 5 * * 1',
start_date=pendulum.datetime(2020, 1, 1, tz='UTC'),
catchup=False,
tags=['Feature Store', 'RB', 'u_m1ahn'],
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag": "{{ dag }}"}
op_args = [context, Path(__file__).parent.absolute()]
def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
provide_context=True,
use_dill=True,
system_site_packages=False,
op_args=op_args,
op_kwargs={
"execution_date_str": '{{ execution_date }}',
},
requirements=["dill", "pytz", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3"],
dag=dag)
字符串
唉,当我试图触发这个DAG时,气流给了我以下错误:
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/script.py", line 17, in <module>
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - arg_dict = dill.load(file)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 287, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - return Unpickler(file, ignore=ignore, **kwds).load()
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 442, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - obj = StockUnpickler.load(self)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 432, in find_class
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - return StockUnpickler.find_class(self, module, name)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - ModuleNotFoundError: No module named 'unusual_prefix_4c3a45107010a4223aa054ffc5f7bffc78cce4e7_dag'
型
为什么它给予我这个奇怪的错误--它怎么能被修正呢?
1条答案
按热度按时间ef1yzkbh1#
如果将此函数
make_foo
作为python_callable
参数传递给airflow运算符,并在同一Python源代码中与DAG对象一起定义,则会出现此问题。当我将
make_foo
函数移到另一个Python模块时,DAG终于开始工作了。下面是我的代码:
dags/strange_pickling_error/dag.py
:字符串
dags/strange_pickling_error/some_moar_code.py
:型