我有一个包含动态参数的任务,我想周期性地运行,当任务在django celery beat中被调用时,我如何将动态元素传递给任务参数?下面是我想周期性地运行的任务:
@task(bind=True)
def generate_export(export_type, xform, export_id=None, options=None):
"""
Create appropriate export object given the export type.
param: export_type
param: xform
params: export_id: ID of export object associated with the request
param: options: additional parameters required for the lookup.
binary_select_multiples: boolean flag
end: end offset
ext: export extension type
dataview_pk: dataview pk
group_delimiter: "/" or "."
query: filter_query for custom queries
remove_group_name: boolean flag
split_select_multiples: boolean flag
index_tag: ('[', ']') or ('_', '_')
show_choice_labels: boolean flag
language: language labels as in the XLSForm/XForm
"""
username = xform.user.username
id_string = xform.id_string
end = options.get("end")
extension = options.get("extension", export_type)
filter_query = options.get("query")
remove_group_name = options.get("remove_group_name", False)
start = options.get("start")
export_type_func_map = {
Export.XLS_EXPORT: 'to_xls_export',
Export.CSV_EXPORT: 'to_flat_csv_export',
Export.DHIS2CSV_EXPORT: 'to_dhis2csv_export',
Export.CSV_ZIP_EXPORT: 'to_zipped_csv',
Export.SAV_ZIP_EXPORT: 'to_zipped_sav',
Export.GOOGLE_SHEETS_EXPORT: 'to_google_sheets',
}
if xform is None:
xform = XForm.objects.get(
user__username__iexact=username, id_string__iexact=id_string)
dataview = None
if options.get("dataview_pk"):
dataview = DataView.objects.get(pk=options.get("dataview_pk"))
records = dataview.query_data(dataview, all_data=True,
filter_query=filter_query)
total_records = dataview.query_data(dataview,
count=True)[0].get('count')
else:
records = query_data(xform, query=filter_query, start=start, end=end)
if filter_query:
total_records = query_data(xform, query=filter_query, start=start,
end=end, count=True)[0].get('count')
else:
total_records = xform.num_of_submissions
if isinstance(records, QuerySet):
records = records.iterator()
export_builder = ExportBuilder()
export_builder.TRUNCATE_GROUP_TITLE = True \
if export_type == Export.SAV_ZIP_EXPORT else remove_group_name
export_builder.GROUP_DELIMITER = options.get(
"group_delimiter", DEFAULT_GROUP_DELIMITER
)
export_builder.SPLIT_SELECT_MULTIPLES = options.get(
"split_select_multiples", True
)
export_builder.BINARY_SELECT_MULTIPLES = options.get(
"binary_select_multiples", False
)
export_builder.INCLUDE_LABELS = options.get('include_labels', False)
export_builder.INCLUDE_LABELS_ONLY = options.get(
'include_labels_only', False
)
export_builder.INCLUDE_HXL = options.get('include_hxl', False)
export_builder.INCLUDE_IMAGES \
= options.get("include_images", settings.EXPORT_WITH_IMAGE_DEFAULT)
export_builder.VALUE_SELECT_MULTIPLES = options.get(
'value_select_multiples', False)
export_builder.REPEAT_INDEX_TAGS = options.get(
"repeat_index_tags", DEFAULT_INDEX_TAGS
)
export_builder.SHOW_CHOICE_LABELS = options.get('show_choice_labels',
False)
export_builder.language = options.get('language')
# 'win_excel_utf8' is only relevant for CSV exports
if 'win_excel_utf8' in options and export_type != Export.CSV_EXPORT:
del options['win_excel_utf8']
export_builder.set_survey(xform.survey, xform)
# change the dhis2csv exports to standard csv format
if extension == 'dhis2csv':
extension = 'csv'
temp_file = NamedTemporaryFile(suffix=("." + extension))
columns_with_hxl = export_builder.INCLUDE_HXL and get_columns_with_hxl(
xform.survey_elements)
# get the export function by export type
func = getattr(export_builder, export_type_func_map[export_type])
try:
func.__call__(
temp_file.name, records, username, id_string, filter_query,
start=start, end=end, dataview=dataview, xform=xform,
options=options, columns_with_hxl=columns_with_hxl,
total_records=total_records
)
except NoRecordsFoundError:
pass
except SPSSIOError as e:
export = get_or_create_export(export_id, xform, export_type, options)
export.error_message = str(e)
export.internal_status = Export.FAILED
export.save()
report_exception("SAV Export Failure", e, sys.exc_info())
return export
# generate filename
basename = "%s_%s" % (
id_string, datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f"))
if remove_group_name:
# add 'remove group name' flag to filename
basename = "{}-{}".format(basename, GROUPNAME_REMOVED_FLAG)
if dataview:
basename = "{}-{}".format(basename, DATAVIEW_EXPORT)
filename = basename + "." + extension
# check filename is unique
while not Export.is_filename_unique(xform, filename):
filename = increment_index_in_filename(filename)
file_path = os.path.join(
username,
'exports',
id_string,
export_type,
filename)
# seek to the beginning as required by storage classes
temp_file.seek(0)
export_filename = default_storage.save(file_path,
File(temp_file, file_path))
temp_file.close()
dir_name, basename = os.path.split(export_filename)
# get or create export object
export = get_or_create_export(export_id, xform, export_type, options)
export.filedir = dir_name
export.filename = basename
export.internal_status = Export.SUCCESSFUL
# do not persist exports that have a filter
# Get URL of the exported sheet.
if export_type == Export.GOOGLE_SHEETS_EXPORT:
export.export_url = export_builder.url
# if we should create a new export is true, we should not save it
if start is None and end is None:
export.save()
return export
这就是我所说的celery 节拍时间表中的任务
CELERY_BEAT_SCHEDULE = {
'download_csv': {
'task': 'onadata.libs.utils.export_tools.generate_export',
# There are 4 ways we can handle time, read further
'schedule': crontab(minute='*'),
# If you're using any arguments
'args': ()
}
}
如何将参数传递到任务的实参中?
2条答案
按热度按时间rqdpfwrv1#
Celery Beat中没有动态传递参数的方法,我认为你的函数不适合周期性任务。
不是直接给
generate_export
函数一个因子,而是必须修改它以获取函数中所需的项,或者改为简单的异步操作。wgeznvg72#
我也遇到过类似的问题,
beat_schedule
中的args
字段在启动时是固定的,以后不会更改。但是,有一种方法可以向任务传递不同的参数。
使用
before_task_publish
信号在标题中添加自定义数据。默认情况下,Celery使用JSON序列化器,因此,确保添加到头中的数据是JSON可序列化的,或者,您可以使用pickle序列化数据,但这会带来安全问题。
现在您可以在绑定任务中访问这些标头。