如何从Akka事件流构建Akka流源?

nzkunb0c  于 2022-11-06  发布在  其他
关注(0)|答案(2)|浏览(187)

MyActor接收到Start消息时,它运行一个Akka Stream,并将接收到的每个项目发布到Akka Event Stream

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

现在,在另一个代码块中,我希望从该事件流中的那些项构建一个Source,这样,发布的每个项都可以在另一个Akka Stream中处理。
我怎么能那样做呢?
为了防止可能添加更多选项,请注意,另一个有问题的代码块是Play framework的WebSocket处理程序。

5fjcxozz

5fjcxozz1#

我终于让它与BroadcastHub一起工作了,不再有Akka事件流了。
我的发布者(它本身正在使用一个Source)看起来像这样:

val publisherSource = someSource
  .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
  .run()

然后在另一个代码块中,我只需要一个对publisherSource的引用:

val subscriberSource = publisherSource
  .map(...) // Whatever

你可以有尽可能多的subscriberSource,因为你想要的,他们都会收到相同的项目。

zlwx9yxi

zlwx9yxi2#

这看起来像是XY problem。如果发布者和订阅者最终分离,那么如果发布者产生数据的速度比订阅者快,会发生什么?
话虽如此,这里有一个方法来做你所要求的:

/**Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)

相关问题