根据celery 的结果发送到工人?

dgsult0t  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(373)

我最近一直在使用storm,它包含一个叫做 fields grouping (afaict与 group() celery 中的概念),其中带有特定密钥的消息将始终路由到同一个工作进程。
为了更清楚地理解我的意思,这里是StormWiki。
字段分组:流按分组中指定的字段进行分区。例如,如果流按“user id”字段分组,则具有相同“user id”的元组将始终转到同一个任务,但是具有不同“user id”的元组可能转到不同的任务。
例如,从单词列表中读取,我想将以a、b、c开头的单词路由到worker process,d、e、f再路由到另一个,等等。
希望这样做的一个原因可能是因为我希望一个进程负责对一组相同的数据进行数据库读写,这样进程之间就不会有竞争条件。
我正在努力找出最好的方法来实现这一点在celery 。
到目前为止,我的最佳解决方案是为每个“组”(例如letters.a、letters.d)使用一个队列,并确保工作进程的数量与队列的数量完全匹配。缺点是每个工人只能运行一个进程,以及各种情况,如工人死亡或工人被添加/删除。
我对celery 不熟悉,所以如果我提到的概念不正确,请纠正我。

thigvfpy

thigvfpy1#

有一点胶水,但这是一个概念:
有一种方法可以通过 CELERY_WORKER_DIRECT . 设置为 True 创建到每个辅助进程的路由。
我定期使用 celery.current_app.control.inspect().ping() 或确定活动主机。例如。:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

当我需要通过一个键进行路由时,我会对值进行散列,然后按工作者的数量进行模运算。这将平均分配任务,并将同一个密钥分配给同一个工作者。例如。:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

然后在执行任务时,我只需指定交换密钥和路由密钥,如下所示:

my_task.apply_async(exchange='C.dq', routing_key=host)

有几个缺点:
从我所看到的情况来看,在一个worker上设置一个大于1的并发性将使每个进程从同一个进程中消耗,从而否定了整个练习。不幸的是把它保持在1。
如果工人倒下 ping() 和一个 apply_async ,消息将被发送到不存在的路由。对此的修复方法是捕获超时、重新插入可用主机、重新刷新并重新发送。

iyr7buue

iyr7buue2#

celery 的意义在于,你不需要管理单个工人。
如果需要任务取得数据的所有权,则任务应在运行开始时取得所有权。
如果你想单独管理工人,可能不要用celery 。您应该自己编写worker,并使用消息队列(或者storm)。为正确的工作使用正确的工具。

相关问题