我有我的celery 应用程序如下。am在Python2.7.5和celery 4.4.2上运行,操作系统是centos 7.4。这个想法是通过java\u zip\u sign\u exe变量中定义的shell脚本,在不同主机上运行的多个worker上使用celery 对繁重的文件进行签名(当文件大小以gbs为单位时,有时每个文件需要30分钟)。sign\u heavy\u java\u zip\u files()对sign\u heavy\u java\u zip\u file()进行异步调用,并为其指定所有文件名。
签名\ucelery \u app.py
app = Celery('tasks', broker='redis://:<hostname>:6379/0',backend='redis://:<hostname>:6379/0')
app.conf['worker_prefetch_multiplier'] = 1
app.conf['task_acks_late'] = True
app.conf.task_default_queue = 'default'
app.conf.tasks_queues = (
Queue('default', exchange='default', routing_key='default'),
Queue('heavy_java_zip', exchange='heavy_java_zip', routing_key='heavy_java_zip'),
@app.task
def sign_java_zip_file(filename,User,MaxPerJavaZipFileTime,Site):
print ("Started Signing " + filename + java_zip_sign_exe )
process=subprocess.Popen([java_zip_sign_exe,filename,User,str(MaxPerJavaZipFileTime),Site],stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out,err = process.communicate()
print ("Finished Signing " + filename )
return (process.returncode,out,err)
@app.task(queue='heavy_java_zip')
def sign_heavy_java_zip_files(filenames,User,MaxPerJavaZipFileTime,Site):
job = group(sign_java_zip_file.s(filename,User,MaxPerJavaZipFileTime,Site) for filename in filenames )
job_results=job.apply_async(queue='heavy_java_zip')
return job_results
celery 工人开始如下
celery -A sign_celery_app worker --loglevel=info --concurrency=2 -O fair -Q "heavy_java_zip"
我的main()python调用者文件如下
results_heavy_java_zip=sign_heavy_java_zip_files.delay(heavy_java_zip_file_list,User,int(MaxPerJavaZipFileTime),Site)
results_heavy_java_zip.get(timeout=(MaxTotalJavaZipTime*60))
如预期的那样,当没有工作线程时,超时工作并引发超时异常。但是如果有工人,并且一旦任务开始异步工作,那么超时就没有任何效果。我希望即使在它们运行时,当超时结束时它们也会被中断。我的理解错了吗?
1条答案
按热度按时间dluptydi1#
我相信错误就在这里,函数调用是在主例程中完成的。
必须是(在任务调用中不需要程序进行延迟或异步调用)