使用 ProcessBuilder 启动超过 15 个外部进程时 Akka 流挂起

Akka stream hangs when starting more than 15 external processes using ProcessBuilder

我正在构建具有以下流程的应用程序:

  1. 有要处理的项目来源
  2. 每个项目都应由外部命令处理(最后将是 ffmpeg,但对于这个简单的可重现用例,它只是 cat 让数据通过它)
  3. 最后,这样的外部命令的输出被保存在某个地方(同样,为了这个例子,它只是将它保存到本地文本文件)

所以我在做以下操作:

  1. 准备一个包含项目的源
  2. 制作一个 Akka 图形,使用广播将源项目扇出到单独的流中
  3. 单独的流程使用 ProcessBuilder 结合 Flow.fromSinkAndSource 构建外部流程执行的流程
  4. 使用将数据保存到文件的接收器结束单个流。

完整代码示例:

import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object MyApp extends App {

  // When this is changed to something above 15, the graph just stops
  val PROCESSES_COUNT = Integer.parseInt(args(0))

  println(s"Running with ${PROCESSES_COUNT} processes...")

  implicit val system                          = ActorSystem("MyApp")
  implicit val globalContext: ExecutionContext = ExecutionContext.global

  def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
    val convertProcess = new ProcessBuilder(cmd).start
    val pipeIn         = new BufferedOutputStream(convertProcess.getOutputStream)
    val pipeOut        = new BufferedInputStream(convertProcess.getInputStream)
    Flow
      .fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
  }

  val source = Source(1 to 100)
    .map(element => {
      println(s"--emit: ${element}")
      ByteString(element)
    })

  val sinksList = (1 to PROCESSES_COUNT).map(i => {
    Flow[ByteString]
      .via(executeCmdOnStream("cat"))
      .toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
  })

  val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>

    val broadcast = builder.add(Broadcast[ByteString](sinks.size))
    source ~> broadcast.in
    for (i <- broadcast.outlets.indices) {
      broadcast.out(i) ~> sinks(i)
    }
    ClosedShape
  }

  Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)

}

运行 使用以下命令:

sbt "run PROCESSES_COUNT"

sbt "run 15"

在我提高“外部进程”的数量(代码中的PROCESSES_COUNT)之前,这一切都运行良好。当它是 15 或更少时,一切顺利,但当它是 16 或更多 时,会发生以下情况:

  1. 发出前 16 个项目后整个执行挂起(这 16 个项目是 Akka 的默认缓冲区大小 AFAIK)
  2. 我可以看到系统中启动了 cat 个进程(全部 16 个进程
  3. 当我在系统中手动杀死这些cat进程之一时,一些东西被释放并继续处理(当然结果是一个文件是空的,因为我杀死了它的处理命令)

我检查了这是肯定是由外部执行引起的(不是即 Akka Broadcast 本身的限制)。

recorded a video 显示了这两种情况(首先,15 个项目工作正常,然后 16 个项目挂起并通过终止一个进程释放)- link 到视频

代码和视频都在this repo

对于在哪里寻找解决方案的任何帮助或建议,我将不胜感激。

事实证明这是 Akka configuration 阻塞 IO 调度程序级别的限制:

因此将该值更改为大于流的数量解决了问题:

akka.actor.default-blocking-io-dispatcher.thread-pool-executor.fixed-pool-size = 50

这是一个有趣的问题,看起来流是 dead-locking。增加线程可能是治标不治本。

问题出在下面的代码

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn),
    StreamConverters.fromInputStream(() => pipeOut)
  )

fromInputStreamfromOutputStream 都将使用与您正确注意到的相同的 default-blocking-io-dispatcher。使用专用线程池的原因是两者都执行阻塞 运行ning 线程的 Java API 调用。

这是 fromInputStream 的线程堆栈跟踪的一部分,显示阻塞发生的位置。

at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon.onPull(InputStreamSource.scala:63)

现在,您运行宁 16 个同步 Sink 连接到单个 Source。为了支持back-pressure,一个Source只会在所有 Sink发送一个pull命令时产生一个元素。

接下来发生的事情是,您同时调用了 16 个方法 FileInputStream.readBytes,它们立即阻塞了 default-blocking-io-dispatcher 的所有线程。并且 fromOutputStream 没有线程可以从 Source 写入任何数据或执行任何类型的工作。因此,你有一个 dead-lock.

如果增加池中的线程数,问题可以解决。但这只是消除了症状。

正确的解决方法是运行fromOutputStreamfromInputStream在两个独立的线程池中。以下是您的操作方法。

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn).async("blocking-1"),
    StreamConverters.fromInputStream(() => pipeOut).async("blocking-2")
  )

配置如下

blocking-1 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}

blocking-2 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}

因为他们不再共享池,所以 fromOutputStreamfromInputStream 都可以独立执行任务。

另请注意,我刚刚为每个池分配了 2 个线程,以表明这与线程数无关,而是与池分离有关。

我希望这有助于更好地理解 akka 流。