ksql如何将传感器数据主题与没有公共id的指示符主题连接起来

suzh9iv8  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(248)

我有数据从传感器服务器流到我的主题,我无法控制。
在主题a中,有多个传感器数据有效载荷进入(a、b、c、d…)。
在主题b中,出现了指示消息(如1,2,…),告诉我从现在起,从主题a传入的传感器数据属于新对象x,而不是x-1
我想将主题a中的数据与当时主题b中的当前对象相关联。
我对ksql和流逻辑相当陌生,所以我不知道这是否可行。感觉可能有一个非常简单的解决方案,但我在示例中没有找到类似的解决方案。
编辑:
传感器数据(主题a)可能如下所示:

sensorPath                        timestamp  value
simulation/machine/plc/sensor-1 | 1 |        7.0
simulation/machine/plc/sensor-2 | 1 |        2.0
simulation/machine/plc/sensor-1 | 2 |        6.0
simulation/machine/plc/sensor-2 | 2 |        1.0
...
simulation/machine/plc/sensor-1 | 10 |       10.0
simulation/machine/plc/sensor-2 | 10 |       12.0

指标数据(主题b)可以如下所示

informationPath                timestamp   WorkpieceID
simulation/informationString | 1  |        0020181
simulation/informationString | 10 |        0020182

我基本上想将传感器数据与新主题/流中的相应工件相匹配。新到达的传感器数据总是属于最新的信息串/工件。
主题c应该是这样的:

sensorPath                        SensorTimestamp  value WorkpieceID
simulation/machine/plc/sensor-1 | 1 |              7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 |              2.0 | 0020181             
simulation/machine/plc/sensor-1 | 2 |              6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 |              1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 |             10.0| 0020182
simulation/machine/plc/sensor-2 | 10 |             12.0| 0020182

所以我需要像topica.timestamp>=current(topicb.timestamp)上的join这样的东西?!

sq1bmfud

sq1bmfud1#

是的,您可以用ksql来实现这一点。这是一个成功的例子。如果您想重现下面的示例,我在这里使用这个docker compose文件作为我的测试环境。
首先,我根据您提供的示例填充一些测试数据。我已经根据当前的历元,+2和+10秒制作了时间戳:
传感器测试数据:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t sensor -P <<EOF
{"sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}
EOF

指标测试数据:

docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t indicator -P << EOF
{"informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}
EOF

现在,我启动ksql cli:

docker run --network cos_default --interactive --tty --rm \
    confluentinc/cp-ksql-cli:5.0.0 \
    http://ksql-server:8088

在ksql中,我们可以检查主题中的源数据:

KSQL> PRINT 'sensor' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":7.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":2.0,"timestamp":1541623171000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":6.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":1.0,"timestamp":1541623231000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-1","value":10.0,"timestamp":1541623771000}
{"ROWTIME":1541624847072,"ROWKEY":"null","sensorPath":"simulation/machine/plc/sensor-2","value":12.0,"timestamp":1541623771000}

KSQL> PRINT 'indicator' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020181","timestamp":1541623171000}
{"ROWTIME":1541624851692,"ROWKEY":"null","informationPath":"simulation/informationString","WorkpieceID":"0020182","timestamp":1541623771000}

现在我们注册主题以在ksql中使用,并声明模式:

ksql> CREATE STREAM SENSOR (SENSORPATH VARCHAR, VALUE DOUBLE, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='sensor',TIMESTAMP='timestamp');

Message
----------------
Stream created
----------------
ksql> CREATE STREAM INDICATOR (INFORMATIONPATH VARCHAR, WORKPIECEID VARCHAR, TIMESTAMP BIGINT) WITH (VALUE_FORMAT='JSON',KAFKA_TOPIC='indicator',TIMESTAMP='timestamp');

Message
----------------
Stream created
----------------

我们可以查询已创建的ksql流:

ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , sensorpath, value FROM sensor;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-1 | 7.0
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/machine/plc/sensor-2 | 2.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-1 | 6.0
1541623231000 | 1541623231000 | 2018-11-07 20:40:31 +0000 | 2018-11-07 20:40:31 +0000 | simulation/machine/plc/sensor-2 | 1.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-1 | 10.0
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/machine/plc/sensor-2 | 12.0

ksql> SELECT ROWTIME, timestamp, TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd HH:mm:ss Z') , informationPath, WorkpieceID FROM indicator;
1541623171000 | 1541623171000 | 2018-11-07 20:39:31 +0000 | 2018-11-07 20:39:31 +0000 | simulation/informationString | 0020181
1541623771000 | 1541623771000 | 2018-11-07 20:49:31 +0000 | 2018-11-07 20:49:31 +0000 | simulation/informationString | 0020182

请注意 ROWTIME 这条溪流的形状不同于 ROWTIMEPRINT 输出。那是因为 PRINT 输出显示kafka消息的时间戳,而在流中我们重写了 WITH 子句改为使用 timestamp 来自消息负载本身的列。
为了把这两个主题结合起来,我们要做两件事:
创建一个用于连接它们的人工键,因为当前数据中不存在任何键。我们还将应用这个新列作为kafka消息的键(这是进行连接所必需的)。
将“indicator”事件流建模为ksql表。这使我们能够查询 WorkpieceID 基于时间戳的值
要添加一个人工连接键,只需选择一个常量并用 AS 子句,并将其用作消息键 PARTITION BY :

ksql> CREATE STREAM SENSOR_KEYED AS SELECT sensorPath, value, 'X' AS JOIN_KEY FROM sensor PARTITION BY JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------

出于兴趣,我们可以检查创建的Kafka主题

ksql> PRINT SENSOR_KEYED FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":7.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623171000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":2.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":6.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623231000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":1.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-1","VALUE":10.0,"JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","SENSORPATH":"simulation/machine/plc/sensor-2","VALUE":12.0,"JOIN_KEY":"X"}

请注意,rowkey现在是join\ u键,而不是前面在 PRINT 'sensor' 输出。如果你忽略了 PARTITION BY 然后添加join_键,但消息保持未设置键,这不是我们希望join能够工作的。
现在我们也重新输入指标数据:

ksql> CREATE STREAM INDICATOR_KEYED AS SELECT informationPath, WorkpieceID, 'X' as JOIN_KEY FROM indicator PARTITION BY JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT 'INDICATOR_KEYED' FROM BEGINNING;
Format:JSON
{"ROWTIME":1541623171000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020181","JOIN_KEY":"X"}
{"ROWTIME":1541623771000,"ROWKEY":"X","INFORMATIONPATH":"simulation/informationString","WORKPIECEID":"0020182","JOIN_KEY":"X"}

重新设置了指标数据的键,现在可以将其注册为ksql表。在表中,键的状态由ksql返回,而不是由每个事件返回。我们用这个方法来确定 WorkpieceID 根据时间戳与传感器读数关联。

ksql> CREATE TABLE INDICATOR_STATE (JOIN_KEY VARCHAR, informationPath varchar, WorkpieceID varchar) with (value_format='json',kafka_topic='INDICATOR_KEYED',KEY='JOIN_KEY');

Message
---------------
Table created
---------------

查询表时会显示一个值,即当前状态:

ksql> SELECT * FROM INDICATOR_STATE;
1541623771000 | X | X | simulation/informationString | 0020182

如果此时您向 indicator 主题时,表的状态将更新,您将看到从 SELECT .
最后,我们可以做一个流表连接,持久化到一个新主题:

ksql> CREATE STREAM SENSOR_ENRICHED AS SELECT S.SENSORPATH, TIMESTAMPTOSTRING(S.ROWTIME, 'yyyy-MM-dd HH:mm:ss Z') AS SENSOR_TIMESTAMP, S.VALUE, I.WORKPIECEID FROM SENSOR_KEYED S LEFT JOIN INDICATOR_STATE I ON S.JOIN_KEY=I.JOIN_KEY;

Message
----------------------------
Stream created and running
----------------------------

检查新流:

ksql> DESCRIBE SENSOR_ENRICHED;

Name                 : SENSOR_ENRICHED
Field            | Type
----------------------------------------------
ROWTIME          | BIGINT           (system)
ROWKEY           | VARCHAR(STRING)  (system)
SENSORPATH       | VARCHAR(STRING)
SENSOR_TIMESTAMP | VARCHAR(STRING)
VALUE            | DOUBLE
WORKPIECEID      | VARCHAR(STRING)
----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

查询新流:

ksql> SELECT SENSORPATH, SENSOR_TIMESTAMP, VALUE, WORKPIECEID FROM SENSOR_ENRICHED;
simulation/machine/plc/sensor-1 | 2018-11-07 20:39:31 +0000 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:39:31 +0000 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:40:31 +0000 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2018-11-07 20:40:31 +0000 | 1.0 | 0020181
simulation/machine/plc/sensor-1 | 2018-11-07 20:49:31 +0000 | 10.0 | 0020182
simulation/machine/plc/sensor-2 | 2018-11-07 20:49:31 +0000 | 12.0 | 0020182

因为这是ksql,所以 SENSOR_ENRICHED 流(和同名的基础主题)将由到达 sensor 主题并根据发送到的事件反映任何状态更改 indicator 主题。

相关问题