我在docker容器中设置了DAG,如下所示:
with DAG(
"my_dag",
default_args=dag_config,
schedule="@weekly",
) as dag:
config_env = BashOperator(
task_id="config_env",
bash_command="cd /opt/airflow/include/my_package && python -m pip install -e .",
)
@task()
def get_my_function():
from my_package import my_function
my_function()
运行此DAG时,它返回
ModuleNotFoundError: No module named 'my_package'
但是,如果我进入docker容器使用Python shell进行导入,它的工作没有问题。
from my_package import my_function
这表明my_package已成功安装在环境中。
我还检查了Python shell和airflow是否使用相同的Python环境。对于Python shell,我使用which python
。
对于Airflow DAG,我修改了任务如下:
@task()
def get_my_function():]
import sys
print("Python Executable Path:", sys.executable)
print("Python Version:", sys.version)
print("sys.path:", sys.path)
from my_package import my_fucntion
Python可执行文件是相同的/usr/local/bin/python
。我还尝试使用PythonOperator
代替taskFlow API。同样的错误返回。
- 气流2.5.1
- Python 3.9
1条答案
按热度按时间2hh7jdfx1#
问题是Airflow在
./lib/site-packages
路径中找不到包源。通过将源程序包代码添加到sys.path
中解决了问题,即,在气流DAG中,填写以下内容: