我正在尝试在我的Flask应用程序上运行RQ/RQ-Worker。我试着把它简化成一个非常简单的测试用例。大致思路是这样的:
1.用户访问/test
页面。它触发要排队的作业并返回排队作业的job_key
。
1.工作进程(worker.py
)处理排队的作业。
1.然后,用户可以访问/retrieve/<job_key>
页面来检索结果。[未显示。]
现在的工作就是做2 + 2的加法。
下面是应用程序代码:
from rq import Queue
from rq.job import Job
# import conn from worker.py
from worker import conn
app = Flask(__name__)
q = Queue(connection=conn)
def add():
return 2+2
@app.route('/test')
def test():
job = q.enqueue_call(func="add", args=None, result_ttl=5000)
return job.get_id()
if __name__ == "__main__":
app.run()
worker.py
源代码如下所示:
from redis import StrictRedis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = 'redis://localhost:6379'
conn = StrictRedis.from_url(redis_url)
if __name__ == "__main__":
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
据我所知,应用程序代码不是问题所在。我可以访问/test
页面,该页面将使作业入队。但是,当我运行worker时,我得到以下错误:
Traceback (most recent call last):
File "/home/<>/dev/sched/venv/lib/python3.5/site-packages/rq/worker.py", line 588, in perform_job
rv = job.perform()
File "/home/<>/dev/sched/venv/lib/python3.5/site-packages/rq/job.py", line 498, in perform
self._result = self.func(*self.args, **self.kwargs)
File "/home/<>/dev/sched/venv/lib/python3.5/site-packages/rq/job.py", line 206, in func
return import_attribute(self.func_name)
File "/home/<>/dev/sched/venv/lib/python3.5/site-packages/rq/utils.py", line 149, in import_attribute
module_name, attribute = name.rsplit('.', 1)
ValueError: not enough values to unpack (expected 2, got 1)
我觉得像那句台词:
worker = Worker(list(map(Queue, listen)))
是问题只是b/c的性质的错误,但我不知道如何修复它。特别是B/c I've seen other projects,它们似乎使用了完全相同的worker源代码。
我的技术堆栈是:
- flask (0.11.1)
- Redis(2.10.5)
- RQ(0.6.0)
- RQ-Worker(0.0.1)
编辑:
我开始觉得这是一个bug。查看RQ源代码中的此问题单:issue #531。
2条答案
按热度按时间bvuwiixz1#
对我来说,这个问题是由RQ无法解决我的工人模块造成的。
解决方案是向enqueue提供“限定”名称,例如:
6ljaweal2#
我自己刚刚遇到这个问题,但像@Moby Duck这样的解决方案不起作用,我能够将其缩小到循环导入错误(
rq
不会将错误报告为循环导入错误,而只是blanketAttributeError
或类似的东西)。如果您怀疑是循环导入,或者@Moby Duck的样式解决方案不起作用,请尝试:
python
或python3
)ImportError
警告,如果您确实有循环导入)