因此,我使用Akka Typed,并希望将每条消息的actor生成到某个流中,根据文档,这似乎是不可能的:
- 警告 *:此方法不是线程安全的,不能从普通参与者消息处理线程(如scala.concurrent.Future回调)以外的线程访问。
def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]
示例:
Behaviors.receiveMessage {
case StartConsume =>
context.log.info("Starting consume messages")
val source: Source[Int, NotUsed] = Source(1 to 10)
source.runForeach(x => context.spawn(Test(x), "Test"))
Behaviors.same
}
有没有其他方法可以做到这一点?
1条答案
按热度按时间5n0oy7gb1#
由于流将具体化为一个不同的参与者,因此几乎可以肯定的是,您不能关闭流中的
ActorContext
(如果它碰巧在与封闭参与者上次运行的线程相同的线程中执行,它不会爆炸),例如,为了生成一个子对象。作为备选方案:
system.actorOf
),你可以让守护演员(具有衍生ActorSystem
的行为的演员)衍生演员:您可以滚动您自己的协议来进行这样的繁殖,或者让守护者实现SpawnProtocol
。然后您可以向context.system
发送适当的消息,但请注意,您需要使用context.system.unsafeUpcast
来使用您正在使用的协议。由于您应该控制守护者的协议,因此这不太可能失败,但编译器实际上不会帮助您。spawn
,而不必在流中。