关闭akka流以进行资源清理

r6l8ljro  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(340)

当使用akka流时,是否有任何方法可以关闭资源清理不再需要的流?
编辑:当源代码由无限流组成时,它可能永远不会完成,我想在完成源代码之前停止它。
用法示例:

Source.from(publisher)
      .map((p) -> p)
      .to(Sink.ignore())
      .run(materializer)

有没有办法关闭流?

6ie5vjzr

6ie5vjzr1#

你可以运行 Stream 以独立的方式 ActorMaterializer 并在一段时间后调用actormaterializer上的shutdown:

val actorSystem = ActorSystem()

val temporaryStream = {

  val localMat = ActorMaterializer()(actorSystem)

  import actorSystem.dispatcher
  actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() }

  Source.from(publisher)
        .map((p) -> p)
        .to(Sink.ignore())
        .run()(localMat)
}

类似地,您可以返回actormaterializer而不是具体化的流,并基于时间以外的一些外部条件关闭actormaterializer。

相关问题