当在DAG中运行PythonSensor任务时,它只会插入脚本一次,而它应该每60秒执行一次。因此,任务会无限期地停留在Running状态。
my_task = PythonSensor(
task_id='my_task_id',
python_callable=my_python_callable,
poke_interval=60,
timeout=60*15,
dag=dag)
而my_python_callable
函数:
def my_python_callable():
try:
validation_output = requests.post(...)
validation_output_json = json.loads(
validation_output.content)
if validation_output_json['is_valid'] == False:
raise ValueError(validation_output_json['error'])
except ValueError as e:
print(type(e))
print(e)
send_error(str(e))
return True
触发DAG时,我获得的与插入操作相关的唯一日志是:
[2023-01-11, 10:05:53 GMT] {python.py:72} INFO - Poking callable: <function my_python_callable at 0x7f942fcd2d40>
它不应该每60秒显示一次吗?
谢谢
1条答案
按热度按时间2lpgd9681#
传感器总是在第一次戳之后完成,因为所有路径最终都返回
True
:ValueError
在try
块中被引发,它在except
块中被处理,但不被重新引发(假设send_error()
处理错误而不引发异常?),或者返回False
以指示传感器应该重试。在except之后,它返回True
,因此您的传感器将在第一次戳之后结束。ValueError
,则返回True
。您还应该在日志中看到这样的消息,告诉您传感器条件已满足,因此进入“成功”状态时:
在
send_error()
之后,您可以返回False
,以指示传感器条件未满足,它应该重试。