python-3.x Celery -如何在Chord中使用Chunks?

owfi6suc  于 2023-04-08  发布在  Python
关注(0)|答案(1)|浏览(196)

chord()中的任务是否可以作为chunk运行?目前我有一个chord实现,它可以很好地处理较小的任务列表,但对于较大的任务,它最终会创建40 k-50 k个任务。我想防止这种情况,因为它似乎工作效率低下,特别是在最后试图收集所有结果时。
我尝试了这个https://stackoverflow.com/a/50019022/5918981,但它对我不起作用。还尝试在task_name.chunks().apply_async()apply_async()内添加回调作为替代方案,但这给了我这个错误:“无法将链接添加到组:使用和弦”。
任何帮助将不胜感激!

91zkwejq

91zkwejq1#

我不知道为什么回调不能直接添加到chunks调用中,但是这个示例代码在类似的情况下对我有效。最重要的是我将chunks转换为一个组

def handle_name(name):
    data = {}
    # handle name
    return data

def do_stuff_callback(results):
    # results will have the following structure in this case: [[{},{}],[{},{}],....]
    pass

def do_stuff():
    chunk_size = 2  # dummy value
    names = [......]  # a very big list
    tasks = [handle_name.chunks(names, chunk_size).group()]
    chord(tasks)(do_stuff_callback.s())

相关问题