所以我在评估Kafka。在我们的用例中,必须创建包含从一个事件到下一个事件的“时间流逝”的新主题,本质上是因为传感器将向kafka报告“on”或“off”。所以有了时间戳、传感器名称和状态,就可以创建持续时间为“开”和“关”状态的新主题。
这在ksql中是否可行,以及如何实现?
或者真的应该让消费者或流处理器来解决这个问题吗?
我的数据是这样的:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
为了得到结果
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
基本上必须组合多个传感器的状态来确定机器的组合状态。工厂里有成百上千的传感器
2条答案
按热度按时间yks3o0rb1#
这是相当容易在Kafka流,所以我会选择2。
首先,必须正确地建模输入数据。您的示例使用本地时间,这使得无法计算两个时间戳之间的持续时间。使用类似epoch time的东西。
从源数据模型开始,比如
和一个目标
现在您已经定义了输入和输出流(但请参阅“数据类型和序列化”),您只需要通过定义
ValueTransformer
.它必须做两件事:
检查传感器历史数据的状态存储,必要时用新数据更新
当历史数据可用时,计算时间戳之间的差异,并将数据与计算的持续时间一起发出
在一个简单的拓扑中把所有的东西放在一起(“连接处理器和状态存储”):
完整的应用程序位于github.com/melsicon/kafka-sensors。
u4dcyp6a2#
跟进来自https://github.com/confluentinc/ksql/issues/2562 为了使用自连接,我提出了以下解决方案:
创建数据
这里我们假设连续事件已经有了counter属性。这种计数器也可以通过简单地聚合随时间变化的事件计数与ksql一起添加。
区分功能
测试一下
传感器本身被忽略了,以保持解决方案的简单性。但是,可以通过使用用于分区的复合键轻松地添加它,如中所述https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key