我正在构建一个具有以下流程的应用程序:
1.存在要处理的项目源
1.每一项都应该由外部命令处理(最终将是ffmpeg
,但对于这个简单的可重现用例,数据通过它传递的只是cat
)
1.最后,这样的外部命令的输出被保存在某个地方(同样,为了这个例子,它只是将其保存到一个本地文本文件中)
所以我在做以下操作:
1.准备包含项目的来源
1.制作一个Akka图,使用广播将源项扇出到各个流中
1.各个流使用ProcessBuilder
和Flow.fromSinkAndSource
来构建此外部流程执行的流
1.使用将数据保存到文件的接收器结束各个流。
完整的代码示例:
import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
object MyApp extends App {
// When this is changed to something above 15, the graph just stops
val PROCESSES_COUNT = Integer.parseInt(args(0))
println(s"Running with ${PROCESSES_COUNT} processes...")
implicit val system = ActorSystem("MyApp")
implicit val globalContext: ExecutionContext = ExecutionContext.global
def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
val convertProcess = new ProcessBuilder(cmd).start
val pipeIn = new BufferedOutputStream(convertProcess.getOutputStream)
val pipeOut = new BufferedInputStream(convertProcess.getInputStream)
Flow
.fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
}
val source = Source(1 to 100)
.map(element => {
println(s"--emit: ${element}")
ByteString(element)
})
val sinksList = (1 to PROCESSES_COUNT).map(i => {
Flow[ByteString]
.via(executeCmdOnStream("cat"))
.toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
})
val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>
val broadcast = builder.add(Broadcast[ByteString](sinks.size))
source ~> broadcast.in
for (i <- broadcast.outlets.indices) {
broadcast.out(i) ~> sinks(i)
}
ClosedShape
}
Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)
}
使用以下命令运行此命令:
sbt "run PROCESSES_COUNT"
也就是说
sbt "run 15"
在我增加“外部进程”的数量(代码中的PROCESSES_COUNT)之前,这一切都运行得很好。当它为15或更少时,一切都很顺利,但当它为16或更多时,就会发生以下情况:
1.整个执行在发出前16个项目后挂起(这16个项目是Akka的默认缓冲区大小AFAIK)
1.我可以看到系统中启动了cat
进程(全部16个)
1.当我在系统中手动终止其中一个cat
进程时,会释放一些东西,处理继续进行(当然,在结果中,一个文件是空的,因为我终止了它的处理命令)
我检查了这是由外部执行造成的(而不是Akka广播本身的限制)。
I recorded a video显示了这两种情况(首先,15个项目工作正常,然后16个项目挂起,并通过终止一个进程释放)-视频链接
代码和视频均采用this repo
我会感谢任何帮助或建议在哪里寻找解决方案,这一个。
2条答案
按热度按时间cczfrluj1#
这是一个有趣的问题,看起来流是死锁的。线程的增加可能是解决症状,但不是根本问题。
问题出在以下代码中
fromInputStream
和fromOutputStream
都将使用相同的default-blocking-io-dispatcher
,正如您所注意到的那样。使用专用线程池的原因是,这两个线程都执行阻塞正在运行的线程的Java API调用。下面是
fromInputStream
的线程堆栈跟踪的一部分,它显示了发生阻塞的位置。现在,您正在运行连接到单个
Source
的16
并发Sink
。为了支持反压,Source
将仅在所有Sink
发送pull
命令时生成一个元素。接下来发生的事情是,您同时有16个对
FileInputStream.readBytes
方法的调用,它们立即阻塞了default-blocking-io-dispatcher
的所有线程,并且没有线程留给fromOutputStream
从Source
写入任何数据或执行任何类型的工作,因此,您有一个死锁。如果增加池中的线程数,这个问题就可以解决。但这只是消除了症状。
正确的解决方案是在两个单独的线程池中运行
fromOutputStream
和fromInputStream
。具有以下配置
由于
fromOutputStream
和fromInputStream
不再共享池,因此它们可以独立执行任务。还请注意,我刚刚为每个池分配了
2
线程,以表明这与线程数无关,而是与池分离有关。我希望这有助于更好地理解 akka 溪流。
lf3rwulv2#
原来这是Akka configuration级别的阻塞IO调度程序的限制:
因此,将该值更改为大于流数量的值可以解决此问题: