Context: We use Kinesis analytics to process our sensor data and find anomalies in the sensor data.
Goal: We need to identify the sensors that didn’t send the data for the past X
minutes.
The following methods have been tried with Kinesis analytics SQL, but no luck:
Stagger Window
technique works for the first 3 test cases, but doesn't work for test case 4.
CREATE OR REPLACE PUMP "STREAM_PUMP_ALERT_DOSCONNECTION" AS INSERT INTO "INTERMEDIATE_SQL_STREAM" SELECT STREAM "deviceID" as "device_key", count("deviceID") as "device_count", ROWTIME as "time" FROM "INTERMEDIATE_SQL_STREAM_FOR_ROOM"
WINDOWED BY STAGGER (
PARTITION BY "deviceID", ROWTIME RANGE INTERVAL '1' MINUTE);
Left join
andgroup by
queries mentioned below doesn't work for all the test cases.
Query 1:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key",
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND Input1.ROWTIME <> Input2.ROWTIME;
Query 2:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND TSDIFF(Input1, Input2) > 0;
Query 3:
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP" AS
INSERT INTO "INTERMEDIATE_SQL_STREAM_FOR_ROOM2"
SELECT STREAM
ROWTIME as "resultrowtime",
Input2."device_key" as "device_key"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM
OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS Input1
LEFT JOIN INTERMEDIATE_SQL_STREAM_FOR_ROOM AS Input2
ON
Input1."device_key" = Input2."device_key"
AND Input1.ROWTIME = Input2.ROWTIME;
CREATE OR REPLACE PUMP "OUTPUT_STREAM_PUMP2" AS
INSERT INTO "DIS_CONN_DEST_SQL_STREAM_ALERT"
SELECT STREAM "device_key", "count"
FROM (
SELECT STREAM
"device_key",
COUNT(*) as "count"
FROM INTERMEDIATE_SQL_STREAM_FOR_ROOM2
GROUP BY FLOOR(INTERMEDIATE_SQL_STREAM_FOR_ROOM2.ROWTIME TO MINUTE), "device_key"
)
WHERE "count" = 1;
Here are test cases for your reference:
Test case 1:
- Device 1 and Device 2 send data continuously to the Kinesis Analytics.
- After
X
minutes, Device 2 continues to send the data, but device 1 is not sending the data.
Output for test case 1:
We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.
Test case 2 (Interval - 10 minutes)
- Device 1 sends data at 09:00
- Device 2 sends data at 09:02
- Device 2 again sends the data at 09:11, but Device 1 doesn’t send any data.
Output for test case 2:
We want the Kinesis Analytics to pop out Device 1, so that we know which device is not sending data.
Test case 3 (Interval - 10 minutes)
- Device 1 and device 2 send data continuously to kinesis analytics.
- Both devices (1 & 2) don't send any data for the next 15 minutes.
Output for test case 3:
We want the Kinesis Analytics to pop out Device 1 & Device 2, so that we know which devices are not sending data.
Test case 4: (Interval - 10 mins)
- Device 1 sends data at 09:00
- Device 2 sends data at 09:02
- Device 1 again sends data at 09:04
- Device 2 again sends data at 09:06
- Then no data
Output for test case 4:
We want the analytics to pop out device 1 at 09:14 and pop out device 2 at 09:16. So that we can get the disconnected devices(i.e devices not sending data) after the exact interval.
Note: AWS Support directed us to simple queries that don't answer the question. Looks like they can help with the exact query only if we upgrade our support plan.
1条答案
按热度按时间nlejzf6q1#
我并不熟悉AWS扩展或修改Apache Flink的所有方式,但开源Flink并没有提供一种简单的方法来检测所有源是否停止发送数据。一种解决方案是使用类似于进程函数的东西,带有处理时间计时器来检测数据的缺失。
文档沿着有这样一个例子:https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function/#example