我想在第一次模式匹配之后跳过匹配的事件。我如何使用flink的跳跃策略?我有一个简单的假设 downloads > 1000 然后向用户发出警报。在我的实现中,在第一次匹配之后,它会不断发出警报,因为1000次之后下载的次数会增加。如何跳过第一次匹配后的所有警报?我已经阅读了skip策略文档,但是一个例子或实现将对我有所帮助。
downloads > 1000
pgpifvop1#
cep更适用于重复的模式,在这里它实际上并不起作用。您所说的跳过策略实际上被称为“匹配后跳过策略”,这意味着您要匹配多个模式。我认为更好的方法是使用过程函数,比如:
class AlertUserOnce extends ProcessFunction[element, element] { lazy val turnOff: ValueState[Boolean] = getRuntimeContext .getState(new ValueStateDescriptor[Boolean]("turn-function-off", classOf[Boolean])) override def processElement(value: element, ctx: Context, out: Collector[element]): Unit = { if (turnOff.value == null && element.downloads > 1000) { alertUser(value) turnOff.update(true) } out.collect(value) } } ... stream.keyBy(...).process(new AlertUserOnce)
1条答案
按热度按时间pgpifvop1#
cep更适用于重复的模式,在这里它实际上并不起作用。您所说的跳过策略实际上被称为“匹配后跳过策略”,这意味着您要匹配多个模式。
我认为更好的方法是使用过程函数,比如: