看起来好像消息没有被正确地放到队列中。
我使用Django和Celery以及Kombu来使用Django自己的数据库作为Broker后端。我所需要的只是一个非常简单的发布/订阅设置。它最终会部署到Heroku,所以我使用foreman在本地运行。下面是相关的代码和信息:
画中画冻结
Django==1.4.2
celery==3.0.15
django-celery==3.0.11
kombu==2.5.6
过程文件
web: source bin/activate; python manage.py run_gunicorn -b 0.0.0.0:$PORT -w 4; python manage.py syncdb
celeryd: python manage.py celeryd -E -B --loglevel=INFO
设置.py
# Celery configuration
import djcelery
CELERY_IMPORTS = ("api.tasks",)
BROKER_URL = "django://localhost//"
djcelery.setup_loader()
放置消息
with Connection(settings.BROKER_URL) as conn:
queue = conn.SimpleQueue('celery')
queue.put(id)
queue.close()
API/tasks.py
@task()
def process_next_task():
with Connection(settings.BROKER_URL) as conn:
queue = conn.SimpleQueue('celery')
message = queue.get(block=True, timeout=1)
id = int(message.payload)
try:
Model.objects.get(id=id)
except Model.DoesNotExist:
message.reject()
else:
# Do stuff here
message.ack()
queue.close()
在终端中,foreman start
工作正常,显示如下:
started with pid 31835
17:08:22 celeryd.1 | started with pid 31836
17:08:22 web.1 | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found
17:08:22 web.1 | 2013-02-14 17:08:22 [31838] [INFO] Starting gunicorn 0.16.1
17:08:22 web.1 | 2013-02-14 17:08:22 [31838] [INFO] Listening at: http://0.0.0.0:5000 (31838)
17:08:22 web.1 | 2013-02-14 17:08:22 [31838] [INFO] Using worker: sync
17:08:22 web.1 | 2013-02-14 17:08:22 [31843] [INFO] Booting worker with pid: 31843
17:08:22 web.1 | 2013-02-14 17:08:22 [31844] [INFO] Booting worker with pid: 31844
17:08:22 web.1 | 2013-02-14 17:08:22 [31845] [INFO] Booting worker with pid: 31845
17:08:22 web.1 | 2013-02-14 17:08:22 [31846] [INFO] Booting worker with pid: 31846
17:08:22 celeryd.1 | [2013-02-14 17:08:22,858: INFO/Beat] Celerybeat: Starting...
17:08:22 celeryd.1 | [2013-02-14 17:08:22,870: WARNING/MainProcess] celery@myhost.local ready.
17:08:22 celeryd.1 | [2013-02-14 17:08:22,873: INFO/MainProcess] consumer: Connected to django://localhost//.
17:08:42 celeryd.1 | [2013-02-14 17:08:42,926: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
17:08:42 celeryd.1 | The full contents of the message body was: body: 25 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}}
最后两行不会立即显示,但是当我的API接收到运行上面put_message部分中代码的POST请求时会显示出来。我曾尝试使用Kombu的完全扩展的Producer和Consumer类,结果相同。
Kombu的简单队列示例:http://kombu.readthedocs.org/en/latest/userguide/examples.html#hello-world-example
celery 文件:http://docs.celeryproject.org/en/latest/index.html
有什么想法吗?
已编辑
在procfile中更改为--loglevel=DEBUG
会将终端输出更改为以下内容:
08:54:33 celeryd.1 | started with pid 555
08:54:33 web.1 | started with pid 554
08:54:33 web.1 | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found
08:54:36 web.1 | 2013-02-15 08:54:36 [557] [INFO] Starting gunicorn 0.16.1
08:54:36 web.1 | 2013-02-15 08:54:36 [557] [INFO] Listening at: http://0.0.0.0:5000 (557)
08:54:36 web.1 | 2013-02-15 08:54:36 [557] [INFO] Using worker: sync
08:54:36 web.1 | 2013-02-15 08:54:36 [564] [INFO] Booting worker with pid: 564
08:54:36 web.1 | 2013-02-15 08:54:36 [565] [INFO] Booting worker with pid: 565
08:54:36 web.1 | 2013-02-15 08:54:36 [566] [INFO] Booting worker with pid: 566
08:54:36 web.1 | 2013-02-15 08:54:36 [567] [INFO] Booting worker with pid: 567
08:54:37 celeryd.1 | [2013-02-15 08:54:37,480: DEBUG/MainProcess] [Worker] Loading modules.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Claiming components.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Building boot step graph.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] New boot order: {ev, queues, beat, pool, mediator, autoreloader, timers, state-db, autoscaler, consumer}
08:54:37 celeryd.1 | [2013-02-15 08:54:37,489: DEBUG/MainProcess] Starting celery.beat._Process...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,490: DEBUG/MainProcess] celery.beat._Process OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: DEBUG/MainProcess] Starting celery.concurrency.processes.TaskPool...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: INFO/Beat] Celerybeat: Starting...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,506: DEBUG/MainProcess] celery.concurrency.processes.TaskPool OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.mediator.Mediator...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] celery.worker.mediator.Mediator OK!
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.consumer.BlockingConsumer...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: WARNING/MainProcess] celery@myhost.local ready.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: DEBUG/MainProcess] consumer: Re-establishing connection to the broker...
08:54:37 celeryd.1 | [2013-02-15 08:54:37,510: INFO/MainProcess] consumer: Connected to django://localhost//.
08:54:37 celeryd.1 | [2013-02-15 08:54:37,628: DEBUG/Beat] Current schedule:
08:54:37 celeryd.1 | <Entry: celery.backend_cleanup celery.backend_cleanup() {<crontab: * 4 * * * (m/h/d/dM/MY)>}
08:54:37 celeryd.1 | [2013-02-15 08:54:37,629: DEBUG/Beat] Celerybeat: Ticking with max interval->5.00 minutes
08:54:37 celeryd.1 | [2013-02-15 08:54:37,658: DEBUG/Beat] Celerybeat: Waking up in 5.00 minutes.
08:54:38 celeryd.1 | [2013-02-15 08:54:38,110: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->16
08:54:38 celeryd.1 | [2013-02-15 08:54:38,126: DEBUG/MainProcess] consumer: Ready to accept tasks!
08:55:08 celeryd.1 | [2013-02-15 08:55:08,184: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
08:55:08 celeryd.1 | The full contents of the message body was: body: 26 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}}
5条答案
按热度按时间goqiplq21#
问题有两个方面:
消息格式错误。它需要是一个字典,符合@asksol提供的http://docs.celeryproject.org/en/latest/internals/protocol.html文档,并遵循该页底部的示例。
放置消息
Procfile进程是运行任务的消费者,因此不需要在任务中设置消费者,我只需要使用发布消息时发送的参数。
API/tasks.py
ufj5ltwl2#
**这不是此问题的解决方案,
但在使用celery4.0.2时标记为问题**
输出如:
解决方案:https://github.com/celery/celery/issues/3675
感谢https://github.com/ask
w80xi6nr3#
显然librabbitmq问题与celepie 4.x中新的默认协议有关。如果您使用Django,您可以通过在设置中放置
CELERY_TASK_PROTOCOL = 1
或在celeryconf.py
中设置app.conf.task_protocol = 1
来切换到以前的协议版本然后,您可以将中的任务与另一个任务进行排队。
pxiryf3j4#
或使用
或者你可以用第二个版本
来自kombu文档
mdfafbf15#
对于未来的人,
在任务体中,必须分配“task”和“id”值,因为消费者的源代码是这样写的:
http://www.pythondoc.com/celery-3.1.11/_modules/celery/worker/consumer.html