我想我对celery 的和弦回调有意见。我在里面搜索了这么多,但找不到任何有用的信息。
让我展示一些代码,然后解释这个问题。
我有以下celery :
execution_chord = chord(
[
tasks.run_web_scanners.si(information).set(queue='fast_queue'),
tasks.run_ip_scans.si(information).set(queue='slow_queue')
],
body=tasks.on_demand_scan_finished.si().set(queue='fast_queue'),
immutable = True
)
execution_chord.apply_async(queue='fast_queue', interval=100)
现在, run_web_scanners
以及 run_ip_scans
两者都有和弦,这和弦不是在异步模式下调用的。主要是因为一个线程产生更多线程的想法是一个灾难的配方。
以下是 run_web_scanners
例如。
@shared_task
def run_web_scanners(scan_information):
# Deep copy just in case
web_information = copy.deepcopy(scan_information)
execution_chord =chord(
[
# Fast_scans
header_scan_task.s(web_information).set(queue='fast_queue')
],
body=web_security_scan_finished.si().set(queue='fast_queue'),
immutable=True)()
return
扫描运行得很好(这些扫描有一个指示成功的body函数),但大多数时候我在回调时都会遇到超时错误 on_demand_scan_finished
被称为。
你知道会发生什么吗? run_web_scanners
以及 run_ip_scans
与所呈现的和弦的“构建”方式相同,身体功能每次都很好。
这是我收到的错误信息
[2020-06-22 18:12:07,397: ERROR/ForkPoolWorker-3] Chord '79af732d-8590-4a1a-bc3b-c76ad8edb742' raised: timeout()
...
...
/lib/python3.7/site-packages/kombu/transport/redis.py", line 382, in get
raise Empty()
_queue.Empty
注意:我搜索了redis错误,但是考虑到它发生的情况,我认为这不是问题所在(可能是其他原因)
编辑:
满栈错误
[2020-06-24 14:05:48,419: ERROR/ForkPoolWorker-3] Chord '5f9d48d7-6531-4da7-97f6-fc530a8d649f' raised: timeout()
Traceback (most recent call last):
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 963, in drain_events
get(self._deliver, timeout=timeout)
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/redis.py", line 382, in get
raise Empty()
_queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/app/builtins.py", line 83, in unlock_chord
propagate=True,
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/result.py", line 822, in join_native
on_message, on_interval):
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/backends/base.py", line 644, in iter_native
on_message=on_message, on_interval=on_interval,
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/celery/backends/amqp.py", line 280, in get_many
wait(timeout=timeout)
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/connection.py", line 323, in drain_events
return self.transport.drain_events(self.connection,**kwargs)
File "/home/handerllon/.local/share/virtualenvs/project-6NphltMm/lib/python3.7/site-packages/kombu/transport/virtual/base.py", line 966, in drain_events
raise socket.timeout()
socket.timeout
暂无答案!
目前还没有任何答案,快来回答吧!