python 没有名为unusual_prefix_* 的模块

rvpgvaaj  于 2023-06-28  发布在  Python
关注(0)|答案(3)|浏览(134)

我尝试在Airflow安装中运行Python Operator Example。安装在同一台机器上部署了Web服务器,调度程序和工作程序,并且对所有非PytohnOperator任务都没有任何抱怨。任务失败,并报告无法导入模块“unusual_prefix_*”,其中 * 是包含DAG的文件的名称。
完整的stacktrace:

['/usr/bin/airflow', 'run', 'tutorialpy', 'print_the_context', '2016-08-23T10:00:00', '--pickle', '90', '--local']
[2016-08-29 11:35:20,054] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-08-29 11:35:20,175] {configuration.py:508} WARNING - section/key [core/fernet_key] not found in config
Traceback (most recent call last):
  File "/usr/bin/airflow", line 90, in <module>
    args.func(args)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/bin/cli.py", line 214, in run
    DagPickle).filter(DagPickle.id == args.pickle).first()
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2659, in first
    ret = list(self[0:1])
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2457, in __getitem__
    return list(res)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 86, in instances
    util.raise_from_cause(err)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 71, in instances
    rows = [proc(row) for row in fetch]
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 428, in _instance
    loaded_instance, populate_existing, populators)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
    dict_[key] = getter(row)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1253, in process
    return loads(value)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 260, in loads
    return load(file)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 250, in load
    obj = pik.load()
  File "/usr/lib/python2.7/pickle.py", line 858, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
    klass = self.find_class(module, name)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 406, in find_class
    return StockUnpickler.find_class(self, module, name)
  File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
    __import__(module)
ImportError: No module named unusual_prefix_tutorialpy
[2016-08-29 11:35:20,498: ERROR/Worker-1] Command 'airflow run tutorialpy print_the_context 2016-08-23T10:00:00 --pickle 90 --local ' returned non-zero exit status 1
[2016-08-29 11:35:20,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[01152b52-044e-4361-888c-ef2d45983c60] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 45, in execute_command
    raise AirflowException('Celery command failed')
AirflowException: Celery command failed
xmjla07d

xmjla07d1#

使用donot_pickle=True选项(see Documentation)我摆脱了错误。可能不是最好的解决方案,但对我的场景来说已经足够好了。

klh5stk1

klh5stk12#

我在kubernetnes上运行puckel/docker-airlfow时也遇到了这个问题。
trun donot_pickle True或False对我不起作用。
经过一些实验后,我通过从用于启动进程的airlfow schduler/worker参数中删除-p参数来解决这个问题
airflow scheduler -n 5 -p -> airflow scheduler -n 5
airflow worker -p -> airflow worker

mrfwxfqh

mrfwxfqh3#

这可能是由于pickle,也可能是因为你有一个这样的import:

import tutorialpy # or something similar

这会导致代码运行并添加unusual_prefix_

def _load_modules_from_file(self, filepath, safe_mode):
    if not might_contain_dag(filepath, safe_mode):
        # Don't want to spam user with skip messages
        if not self.has_logged:
            self.has_logged = True
            self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
        return []

self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
path_hash = hashlib.sha1(filepath.encode('utf-8')).hexdigest()
mod_name = f'unusual_prefix_{path_hash}_{org_mod_name}'

if mod_name in sys.modules:
    del sys.modules[mod_name]

def parse(mod_name, filepath):
    try:
        loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
        spec = importlib.util.spec_from_loader(mod_name, loader)
...

相关问题