我的flink应用程序设计用于处理来自传感器的物联网数据。传感器通过网关发送数据。这就是示例数据的样子 case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)
来自同一传感器的数据可以来自不同的网关
如果网关与网络断开连接,那么我将收到一个关于此的特殊事件 case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
以及使用连接到来自传感器的主数据流的广播流
传感器可能在两种情况下不发送数据,
它坏了
网关与网络断开连接(将接收 GatewayEvents("gwId","disconnected",1617979694)
广播流中的消息)
如果我收到一条消息说某个网关与网络断开连接,并且通过它发送数据的传感器停止发送数据(例如,在1分钟内),我需要创建一个特殊事件
我的半实现如下所示:
case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)
无法执行此过程。有什么想法吗?我认为sessionwindows会对我有所帮助,但我不知道怎样做才是最好的
1条答案
按热度按时间a0x5cqrl1#
所以,最简单的方法就是在这种情况下使用计时器。所以,基本上你可以实现
KeyedCoProcess
如果接收到GatewayDisconnected
消息您将注册计时器(处理时间)以在所需时间后触发。如果传感器收到任何消息,您只需删除已注册的计时器,这样它就不会触发。内部onTimer
函数可以简单地发出所需的事件,因为如果计时器触发,则意味着timespan中没有值。这里要注意的一点是如果你
keyBy(_.sensorId)
这意味着将为通过该网关接收到的每个传感器生成事件。如果您只想为gatewa发出一个事件,只需将分区更改为keyBy(_.gatewayId)
.