限制Akka流中的Source滴答频率,并始终对最新消息采取行动

1u4esq0p  于 12个月前  发布在  其他
关注(0)|答案(2)|浏览(127)

以下是问题的背景:
1.有一个源头,它不断地滴答作响,没有关于滴答频率的保证
1.我们希望限制源的最大滴答速率(例如,我们在DB中启动消息,并且我们不希望存储频率超过每7秒)
1.我们只对最新的事件感兴趣,所以如果在5秒的等待时间内有新的东西发出,我们只对它感兴趣。
这是我能想到的最好的:

Source.tick(5.seconds, 3.seconds, 1)
    .scan(0)((a, b) => a + b) // have a counter
    .wireTap(num => logger.warn(s"up   ${num.formatted("%02d")}"))
    .buffer(1, OverflowStrategy.dropHead)
    .throttle(1, 7.seconds)
    .wireTap(num => logger.warn(s"down     ${num.formatted("%02d")}"))
    .runWith(Sink.ignore)(materializer)

这几乎像我希望的那样工作:有一个节流阀,不会让超过一个项目每7秒,有一个缓冲区之前,节流阀将保留一个单一的元素,并取代它与一个新的到来。
然而,当我检查日志时,我可以看到行为是次优的:

up   01
down     01
up   02
up   03
down     02
up   04
up   05
up   06
down     03
up   07
up   08
down     06
up   09
up   10
down     08

throttle并不从缓冲区中获取最新的元素,而是使用最后一个被throttle的元素被释放时缓冲区中的元素。也就是说,它看起来不是暂停然后检查一个新元素,而是throttle接受一个元素并等待它,直到计时器完成。
有没有更好的方法来做到这一点?或者我应该实现自己的流程?

bn31dyow

bn31dyow1#

GraphStage中实现您自己的,您可以精确控制何时以及如何推/拉元素。
这里有一个例子

class LastElementWithin[A](duration: FiniteDuration) extends GraphStage[FlowShape[A, A]] {

    private val in = Inlet[A]("LastElementWithin.in")
    private val out = Outlet[A]("LastElementWithin.out")

    override val shape: FlowShape[A, A] = FlowShape(in, out)

    private sealed trait CallbackEvent

    private case object Pull extends CallbackEvent

    private case object Push extends CallbackEvent

    private case object Flush extends CallbackEvent

    private case object Finish extends CallbackEvent

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
      new TimerGraphStageLogic(shape) with StageLogging {

        setHandlers(
          in = in,
          out = out,
          handler = new AbstractInOutHandler {

            override def onPush(): Unit = asyncCallback.invoke(Push)

            override def onPull(): Unit = asyncCallback.invoke(Pull)

            override def onUpstreamFinish(): Unit = asyncCallback.invoke(Finish)
          }
        )

        private val FlushTimerKey = "Flush"

        protected override def onTimer(timerKey: Any): Unit = {
          if (timerKey == FlushTimerKey) {
            asyncCallback.invoke(Flush)
          }
        }

        private val asyncCallback = createAsyncCallback(new Procedure[CallbackEvent] {

          private var last: Option[A] = None

          override def apply(param: CallbackEvent): Unit = {
            param match {
              case Pull   => onPull()
              case Push   => onPush()
              case Finish => onFinish()
              case Flush  => flush()
            }
          }

          private def onPull(): Unit = {
            if (!isTimerActive(FlushTimerKey)) scheduleOnce(FlushTimerKey, duration)
            if (!hasBeenPulled(in)) pull(in)
          }

          private def onPush(): Unit = {
            last = Some(grab(in))
            pull(in)
          }

          private def onFinish(): Unit = {
            cancelTimer(FlushTimerKey)
            last.foreach(emit(out, _))
            completeStage()
          }

          private def flush(): Unit = {
            if (isAvailable(out)) {
              last.foreach(emit(out, _))
              scheduleOnce(FlushTimerKey, duration)
            }
          }
        })
      }
  }

在流动中运行

implicit val as: ActorSystem = ActorSystem("test")
    val done = Source
      .tick(5.nanoseconds, 3.seconds, 1)
      .scan(0)((a, b) => a + b)
      .wireTap(num => println(s"up   ${"%02d".format(num)}"))
      .via(Flow.fromGraph(new LastElementWithin(7.seconds)))
      .wireTap(num => println(s"down ${"%02d".format(num)}"))
      .toMat(Sink.ignore)(Keep.right)
      .run()

产生

up   00
up   01
up   02
up   03
down 03
up   04
up   05
down 05
up   06
up   07
down 07
up   08
up   09
up   10
down 10
l7wslrjt

l7wslrjt2#

要获取时间窗口内n个元素中的最后一个元素,您可以使用groupedWithin运算符,然后使用map,例如:

Source
    .tick(5.seconds, 1.second, 1)
    .scan(0)((a, b) => a + b)
    .wireTap(num => println(s"up ${"%02d".format(num)}"))
    .groupedWithin(Int.MaxValue, 7.seconds)
    .map(_.last)
    .wireTap(num => println(s"down ${"%02d".format(num)}"))
    .runWith(Sink.ignore)

相关问题