ModuleNotFoundError:没有名为“my_package”的模块,Airflow在其Python环境中找不到my_package

cwdobuhd  于 2023-05-19  发布在  Python
关注(0)|答案(1)|浏览(169)

我在docker容器中设置了DAG,如下所示:

with DAG(
    "my_dag",
    default_args=dag_config,
    schedule="@weekly",
) as dag:

    config_env = BashOperator(
        task_id="config_env",
        bash_command="cd /opt/airflow/include/my_package && python -m pip install -e .",
    )

    @task()
    def get_my_function():
        from my_package import my_function

        my_function()

运行此DAG时,它返回

ModuleNotFoundError: No module named 'my_package'

但是,如果我进入docker容器使用Python shell进行导入,它的工作没有问题。

from my_package import my_function

这表明my_package已成功安装在环境中。
我还检查了Python shell和airflow是否使用相同的Python环境。对于Python shell,我使用which python
对于Airflow DAG,我修改了任务如下:

@task()
def get_my_function():]
    import sys

    print("Python Executable Path:", sys.executable)
    print("Python Version:", sys.version)
    print("sys.path:", sys.path)

    from my_package import my_fucntion

Python可执行文件是相同的/usr/local/bin/python。我还尝试使用PythonOperator代替taskFlow API。同样的错误返回。

  • 气流2.5.1
  • Python 3.9
2hh7jdfx

2hh7jdfx1#

问题是Airflow在./lib/site-packages路径中找不到包源。通过将源程序包代码添加到sys.path中解决了问题,即,
在气流DAG中,填写以下内容:

import sys

sys.path.append("/opt/airflow/include/my_package/src")

with DAG(
    "my_dag",
    default_args=dag_config,
    schedule="@weekly",
) as dag:

    config_env = BashOperator(
        task_id="config_env",
        bash_command="cd /opt/airflow/include/my_package && python -m pip install -e .",
    )

    @task()
    def get_my_function():
        from my_package import my_function

        my_function()

相关问题