kubernetes PythonVirtualenvOperator给出错误:没有名为unusual_prefix_*_dag的模块

gudnpqoy  于 2023-11-17  发布在  Kubernetes
关注(0)|答案(1)|浏览(102)

我正在使用Airflow 2.5.3与Kubernetes executor和Python 3.7。
我尝试创建一个简单的DAG,其中只传递了一个PythonVirtualnvOperator和两个上下文变量({{ ts }}{{ dag }})。

from datetime import timedelta
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import pendulum
 
 
dag = DAG(
    default_args={
        'retries': 2,
        'retry_delay': timedelta(minutes=10),
    },
    dag_id='fs_rb_cashflow_test5',
    schedule_interval='0 5 * * 1',
    start_date=pendulum.datetime(2020, 1, 1, tz='UTC'),
    catchup=False,
    tags=['Feature Store', 'RB', 'u_m1ahn'],
    render_template_as_native_obj=True,
)
 
context = {"ts": "{{ ts }}", "dag": "{{ dag }}"}
op_args = [context, Path(__file__).parent.absolute()]
 
 
def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)
 
 
make_foo_task = PythonVirtualenvOperator(
        task_id='make_foo',
        python_callable=make_foo,
        provide_context=True,
        use_dill=True,
        system_site_packages=False,
        op_args=op_args,
        op_kwargs={
          "execution_date_str": '{{ execution_date }}',
        },
        requirements=["dill", "pytz", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3"],
        dag=dag)

字符串
唉,当我试图触发这个DAG时,气流给了我以下错误:

[2023-10-23, 13:30:40] {process_utils.py:187} INFO - Traceback (most recent call last):
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/script.py", line 17, in <module>
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     arg_dict = dill.load(file)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 287, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     return Unpickler(file, ignore=ignore, **kwds).load()
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 442, in load
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     obj = StockUnpickler.load(self)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -   File "/tmp/venv5ifve2a5/lib/python3.7/site-packages/dill/_dill.py", line 432, in find_class
[2023-10-23, 13:30:40] {process_utils.py:187} INFO -     return StockUnpickler.find_class(self, module, name)
[2023-10-23, 13:30:40] {process_utils.py:187} INFO - ModuleNotFoundError: No module named 'unusual_prefix_4c3a45107010a4223aa054ffc5f7bffc78cce4e7_dag'


为什么它给予我这个奇怪的错误--它怎么能被修正呢?

ef1yzkbh

ef1yzkbh1#

如果将此函数make_foo作为python_callable参数传递给airflow运算符,并在同一Python源代码中与DAG对象一起定义,则会出现此问题。
当我将make_foo函数移到另一个Python模块时,DAG终于开始工作了。
下面是我的代码:

  • dags/strange_pickling_error/dag.py
import datetime
import pendulum
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill

from strange_pickling_error.some_moar_code import make_foo

dag = DAG(
    dag_id='strange_pickling_error_dag',
    schedule_interval='0 5 * * 1',
    start_date=datetime.datetime(2020, 1, 1),
    catchup=False,
    render_template_as_native_obj=True,
)

context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}

make_foo_task = PythonVirtualenvOperator(
    task_id='make_foo',
    python_callable=make_foo,
    use_dill=True,
    system_site_packages=False,
    op_args=[context],
    requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}", "psycopg2-binary >= 2.9, < 3",
                  f"pendulum=={pendulum.__version__}", "lazy-object-proxy"],
    dag=dag)

字符串

  • dags/strange_pickling_error/some_moar_code.py
def make_foo(*args, **kwargs):
    print("---> making foo!")
    print("make foo(...): args")
    print(args)
    print("make foo(...): kwargs")
    print(kwargs)

相关问题