我正在使用一个测试设置,包括汇合平台(docker)和am处理包含以下信息的记录:传感器id、时间戳、值。使用robinhood的《浮士德》(类似于kafka streams,但在python中),我试图做到以下几点:
只要有一个传感器的新记录,就应该有一个“计时器”,如果在给定的时间内没有收到该传感器id的新记录,则应该有一个错误,指示该传感器/机器可能出现故障。
我试过使用 time.sleep()
但它只会休眠10秒,然后处理下一个记录。
有没有可能用我正在使用的设置做这样的事情?
我正在使用一个测试设置,包括汇合平台(docker)和am处理包含以下信息的记录:传感器id、时间戳、值。使用robinhood的《浮士德》(类似于kafka streams,但在python中),我试图做到以下几点:
只要有一个传感器的新记录,就应该有一个“计时器”,如果在给定的时间内没有收到该传感器id的新记录,则应该有一个错误,指示该传感器/机器可能出现故障。
我试过使用 time.sleep()
但它只会休眠10秒,然后处理下一个记录。
有没有可能用我正在使用的设置做这样的事情?
1条答案
按热度按时间3vpjnl9f1#
您可以使用ksql的窗口滚动:
创建传感器信息流;
最后创建一个表,其中包含在10秒的时间窗口内只出现一次的故障传感器: