使用ProcessBuilder启动超过15个外部进程时,Akka流挂起

7lrncoxx  于 2022-11-05  发布在  其他
关注(0)|答案(2)|浏览(156)

我正在构建一个具有以下流程的应用程序:
1.存在要处理的项目源
1.每一项都应该由外部命令处理(最终将是ffmpeg,但对于这个简单的可重现用例,数据通过它传递的只是cat
1.最后,这样的外部命令的输出被保存在某个地方(同样,为了这个例子,它只是将其保存到一个本地文本文件中)
所以我在做以下操作:
1.准备包含项目的来源
1.制作一个Akka图,使用广播将源项扇出到各个流中
1.各个流使用ProcessBuilderFlow.fromSinkAndSource来构建此外部流程执行的流
1.使用将数据保存到文件的接收器结束各个流。
完整的代码示例:

import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object MyApp extends App {

  // When this is changed to something above 15, the graph just stops
  val PROCESSES_COUNT = Integer.parseInt(args(0))

  println(s"Running with ${PROCESSES_COUNT} processes...")

  implicit val system                          = ActorSystem("MyApp")
  implicit val globalContext: ExecutionContext = ExecutionContext.global

  def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
    val convertProcess = new ProcessBuilder(cmd).start
    val pipeIn         = new BufferedOutputStream(convertProcess.getOutputStream)
    val pipeOut        = new BufferedInputStream(convertProcess.getInputStream)
    Flow
      .fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
  }

  val source = Source(1 to 100)
    .map(element => {
      println(s"--emit: ${element}")
      ByteString(element)
    })

  val sinksList = (1 to PROCESSES_COUNT).map(i => {
    Flow[ByteString]
      .via(executeCmdOnStream("cat"))
      .toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
  })

  val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>

    val broadcast = builder.add(Broadcast[ByteString](sinks.size))
    source ~> broadcast.in
    for (i <- broadcast.outlets.indices) {
      broadcast.out(i) ~> sinks(i)
    }
    ClosedShape
  }

  Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)

}

使用以下命令运行此命令:

sbt "run PROCESSES_COUNT"

也就是说

sbt "run 15"

在我增加“外部进程”的数量(代码中的PROCESSES_COUNT)之前,这一切都运行得很好。当它为15或更少时,一切都很顺利,但当它为16或更多时,就会发生以下情况:
1.整个执行在发出前16个项目后挂起(这16个项目是Akka的默认缓冲区大小AFAIK)
1.我可以看到系统中启动了cat进程(全部16个
1.当我在系统中手动终止其中一个cat进程时,会释放一些东西,处理继续进行(当然,在结果中,一个文件是空的,因为我终止了它的处理命令)
我检查了这是由外部执行造成的(而不是Akka广播本身的限制)。
I recorded a video显示了这两种情况(首先,15个项目工作正常,然后16个项目挂起,并通过终止一个进程释放)-视频链接
代码和视频均采用this repo
我会感谢任何帮助或建议在哪里寻找解决方案,这一个。

cczfrluj

cczfrluj1#

这是一个有趣的问题,看起来流是死锁的。线程的增加可能是解决症状,但不是根本问题。
问题出在以下代码中

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn),
    StreamConverters.fromInputStream(() => pipeOut)
  )

fromInputStreamfromOutputStream都将使用相同的default-blocking-io-dispatcher,正如您所注意到的那样。使用专用线程池的原因是,这两个线程都执行阻塞正在运行的线程的Java API调用。
下面是fromInputStream的线程堆栈跟踪的一部分,它显示了发生阻塞的位置。

at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon$1.onPull(InputStreamSource.scala:63)

现在,您正在运行连接到单个Source16并发Sink。为了支持反压,Source将仅在所有Sink发送pull命令时生成一个元素。
接下来发生的事情是,您同时有16个对FileInputStream.readBytes方法的调用,它们立即阻塞了default-blocking-io-dispatcher的所有线程,并且没有线程留给fromOutputStreamSource写入任何数据或执行任何类型的工作,因此,您有一个死锁。
如果增加池中的线程数,这个问题就可以解决。但这只是消除了症状。
正确的解决方案是在两个单独的线程池中运行fromOutputStreamfromInputStream

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn).async("blocking-1"),
    StreamConverters.fromInputStream(() => pipeOut).async("blocking-2")
  )

具有以下配置

blocking-1 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}

blocking-2 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}

由于fromOutputStreamfromInputStream不再共享池,因此它们可以独立执行任务。
还请注意,我刚刚为每个池分配了2线程,以表明这与线程数无关,而是与池分离有关。
我希望这有助于更好地理解 akka 溪流。

lf3rwulv

lf3rwulv2#

原来这是Akka configuration级别的阻塞IO调度程序的限制:

因此,将该值更改为大于流数量的值可以解决此问题:

akka.actor.default-blocking-io-dispatcher.thread-pool-executor.fixed-pool-size = 50

相关问题