python 气流传感器在运行时仅可调用一次

zf2sa74q  于 2023-01-12  发布在  Python
关注(0)|答案(1)|浏览(90)

当在DAG中运行PythonSensor任务时,它只会插入脚本一次,而它应该每60秒执行一次。因此,任务会无限期地停留在Running状态。

my_task = PythonSensor(
    task_id='my_task_id',
    python_callable=my_python_callable,
    poke_interval=60,
    timeout=60*15,
    dag=dag)

my_python_callable函数:

def my_python_callable():

    try:

        validation_output = requests.post(...) 

        validation_output_json = json.loads(
            validation_output.content)

        if validation_output_json['is_valid'] == False:
            raise ValueError(validation_output_json['error'])
    
    except ValueError as e:

        print(type(e))
        print(e)

        send_error(str(e))

    return True

触发DAG时,我获得的与插入操作相关的唯一日志是:

[2023-01-11, 10:05:53 GMT] {python.py:72} INFO - Poking callable: <function my_python_callable at 0x7f942fcd2d40>

它不应该每60秒显示一次吗?
谢谢

2lpgd968

2lpgd9681#

传感器总是在第一次戳之后完成,因为所有路径最终都返回True

  • 如果ValueErrortry块中被引发,它在except块中被处理,但不被重新引发(假设send_error()处理错误而不引发异常?),或者返回False以指示传感器应该重试。在except之后,它返回True,因此您的传感器将在第一次戳之后结束。
  • 如果没有引发ValueError,则返回True

您还应该在日志中看到这样的消息,告诉您传感器条件已满足,因此进入“成功”状态时:

Success criteria met. Exiting.

send_error()之后,您可以返回False,以指示传感器条件未满足,它应该重试。

相关问题