当MyActor
接收到Start
消息时,它运行一个Akka Stream
,并将接收到的每个项目发布到Akka Event Stream
。
class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {
override def receive: Receive = {
case Start =>
someSource
.toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
.run()
}
}
现在,在另一个代码块中,我希望从该事件流中的那些项构建一个Source
,这样,发布的每个项都可以在另一个Akka Stream
中处理。
我怎么能那样做呢?
为了防止可能添加更多选项,请注意,另一个有问题的代码块是Play framework
的WebSocket处理程序。
2条答案
按热度按时间5fjcxozz1#
我终于让它与BroadcastHub一起工作了,不再有Akka事件流了。
我的发布者(它本身正在使用一个Source)看起来像这样:
然后在另一个代码块中,我只需要一个对publisherSource的引用:
你可以有尽可能多的
subscriberSource
,因为你想要的,他们都会收到相同的项目。zlwx9yxi2#
这看起来像是XY problem。如果发布者和订阅者最终分离,那么如果发布者产生数据的速度比订阅者快,会发生什么?
话虽如此,这里有一个方法来做你所要求的: