进程函数内的flink广播状态实现会话窗口

jgovgodb  于 2021-07-15  发布在  Flink
关注(0)|答案(1)|浏览(506)

我的flink应用程序设计用于处理来自传感器的物联网数据。传感器通过网关发送数据。这就是示例数据的样子 case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long) 来自同一传感器的数据可以来自不同的网关
如果网关与网络断开连接,那么我将收到一个关于此的特殊事件 case class GatewayEvents(gatewayId: String, event: String, timestamp: Long) 以及使用连接到来自传感器的主数据流的广播流
传感器可能在两种情况下不发送数据,
它坏了
网关与网络断开连接(将接收 GatewayEvents("gwId","disconnected",1617979694) 广播流中的消息)
如果我收到一条消息说某个网关与网络断开连接,并且通过它发送数据的传感器停止发送数据(例如,在1分钟内),我需要创建一个特殊事件
我的半实现如下所示:

case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)

val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...

val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)

val events: sensorData.
  .keyBy(_.sensorId)
  .connect(broadcastGatewayEventsStream)
  .process(...)

无法执行此过程。有什么想法吗?我认为sessionwindows会对我有所帮助,但我不知道怎样做才是最好的

a0x5cqrl

a0x5cqrl1#

所以,最简单的方法就是在这种情况下使用计时器。所以,基本上你可以实现 KeyedCoProcess 如果接收到 GatewayDisconnected 消息您将注册计时器(处理时间)以在所需时间后触发。如果传感器收到任何消息,您只需删除已注册的计时器,这样它就不会触发。内部 onTimer 函数可以简单地发出所需的事件,因为如果计时器触发,则意味着timespan中没有值。
这里要注意的一点是如果你 keyBy(_.sensorId) 这意味着将为通过该网关接收到的每个传感器生成事件。如果您只想为gatewa发出一个事件,只需将分区更改为 keyBy(_.gatewayId) .

相关问题