我有数据从传感器服务器流到我的主题,我无法控制。
在主题a中,有多个传感器数据有效载荷进入(a、b、c、d…)。
在主题b中,出现了指示消息(如1,2,…),告诉我从现在起,从主题a传入的传感器数据属于新对象x,而不是x-1
我想将主题a中的数据与当时主题b中的当前对象相关联。
我对ksql和流逻辑相当陌生,所以我不知道这是否可行。感觉可能有一个非常简单的解决方案,但我在示例中没有找到类似的解决方案。
编辑:
传感器数据(主题a)可能如下所示:
sensorPath timestamp value
simulation/machine/plc/sensor-1 | 1 | 7.0
simulation/machine/plc/sensor-2 | 1 | 2.0
simulation/machine/plc/sensor-1 | 2 | 6.0
simulation/machine/plc/sensor-2 | 2 | 1.0
...
simulation/machine/plc/sensor-1 | 10 | 10.0
simulation/machine/plc/sensor-2 | 10 | 12.0
指标数据(主题b)可以如下所示
informationPath timestamp WorkpieceID
simulation/informationString | 1 | 0020181
simulation/informationString | 10 | 0020182
我基本上想将传感器数据与新主题/流中的相应工件相匹配。新到达的传感器数据总是属于最新的信息串/工件。
主题c应该是这样的:
sensorPath SensorTimestamp value WorkpieceID
simulation/machine/plc/sensor-1 | 1 | 7.0 | 0020181
simulation/machine/plc/sensor-2 | 1 | 2.0 | 0020181
simulation/machine/plc/sensor-1 | 2 | 6.0 | 0020181
simulation/machine/plc/sensor-2 | 2 | 1.0 | 0020181
...
simulation/machine/plc/sensor-1 | 10 | 10.0| 0020182
simulation/machine/plc/sensor-2 | 10 | 12.0| 0020182
所以我需要像topica.timestamp>=current(topicb.timestamp)上的join这样的东西?!
1条答案
按热度按时间sq1bmfud1#
是的,您可以用ksql来实现这一点。这是一个成功的例子。如果您想重现下面的示例,我在这里使用这个docker compose文件作为我的测试环境。
首先,我根据您提供的示例填充一些测试数据。我已经根据当前的历元,+2和+10秒制作了时间戳:
传感器测试数据:
指标测试数据:
现在,我启动ksql cli:
在ksql中,我们可以检查主题中的源数据:
现在我们注册主题以在ksql中使用,并声明模式:
我们可以查询已创建的ksql流:
请注意
ROWTIME
这条溪流的形状不同于ROWTIME
在PRINT
输出。那是因为PRINT
输出显示kafka消息的时间戳,而在流中我们重写了WITH
子句改为使用timestamp
来自消息负载本身的列。为了把这两个主题结合起来,我们要做两件事:
创建一个用于连接它们的人工键,因为当前数据中不存在任何键。我们还将应用这个新列作为kafka消息的键(这是进行连接所必需的)。
将“indicator”事件流建模为ksql表。这使我们能够查询
WorkpieceID
基于时间戳的值要添加一个人工连接键,只需选择一个常量并用
AS
子句,并将其用作消息键PARTITION BY
:出于兴趣,我们可以检查创建的Kafka主题
请注意,rowkey现在是join\ u键,而不是前面在
PRINT 'sensor'
输出。如果你忽略了PARTITION BY
然后添加join_键,但消息保持未设置键,这不是我们希望join能够工作的。现在我们也重新输入指标数据:
重新设置了指标数据的键,现在可以将其注册为ksql表。在表中,键的状态由ksql返回,而不是由每个事件返回。我们用这个方法来确定
WorkpieceID
根据时间戳与传感器读数关联。查询表时会显示一个值,即当前状态:
如果此时您向
indicator
主题时,表的状态将更新,您将看到从SELECT
.最后,我们可以做一个流表连接,持久化到一个新主题:
检查新流:
查询新流:
因为这是ksql,所以
SENSOR_ENRICHED
流(和同名的基础主题)将由到达sensor
主题并根据发送到的事件反映任何状态更改indicator
主题。