使用 ProcessBuilder 启动超过 15 个外部进程时 Akka 流挂起
Akka stream hangs when starting more than 15 external processes using ProcessBuilder
我正在构建具有以下流程的应用程序:
- 有要处理的项目来源
- 每个项目都应由外部命令处理(最后将是
ffmpeg
,但对于这个简单的可重现用例,它只是 cat
让数据通过它)
- 最后,这样的外部命令的输出被保存在某个地方(同样,为了这个例子,它只是将它保存到本地文本文件)
所以我在做以下操作:
- 准备一个包含项目的源
- 制作一个 Akka 图形,使用广播将源项目扇出到单独的流中
- 单独的流程使用
ProcessBuilder
结合 Flow.fromSinkAndSource
构建外部流程执行的流程
- 使用将数据保存到文件的接收器结束单个流。
完整代码示例:
import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
object MyApp extends App {
// When this is changed to something above 15, the graph just stops
val PROCESSES_COUNT = Integer.parseInt(args(0))
println(s"Running with ${PROCESSES_COUNT} processes...")
implicit val system = ActorSystem("MyApp")
implicit val globalContext: ExecutionContext = ExecutionContext.global
def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
val convertProcess = new ProcessBuilder(cmd).start
val pipeIn = new BufferedOutputStream(convertProcess.getOutputStream)
val pipeOut = new BufferedInputStream(convertProcess.getInputStream)
Flow
.fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
}
val source = Source(1 to 100)
.map(element => {
println(s"--emit: ${element}")
ByteString(element)
})
val sinksList = (1 to PROCESSES_COUNT).map(i => {
Flow[ByteString]
.via(executeCmdOnStream("cat"))
.toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
})
val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>
val broadcast = builder.add(Broadcast[ByteString](sinks.size))
source ~> broadcast.in
for (i <- broadcast.outlets.indices) {
broadcast.out(i) ~> sinks(i)
}
ClosedShape
}
Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)
}
运行 使用以下命令:
sbt "run PROCESSES_COUNT"
即
sbt "run 15"
在我提高“外部进程”的数量(代码中的PROCESSES_COUNT)之前,这一切都运行良好。当它是 15 或更少时,一切顺利,但当它是 16 或更多 时,会发生以下情况:
- 发出前 16 个项目后整个执行挂起(这 16 个项目是 Akka 的默认缓冲区大小 AFAIK)
- 我可以看到系统中启动了
cat
个进程(全部 16 个进程)
- 当我在系统中手动杀死这些
cat
进程之一时,一些东西被释放并继续处理(当然结果是一个文件是空的,因为我杀死了它的处理命令)
我检查了这是肯定是由外部执行引起的(不是即 Akka Broadcast 本身的限制)。
我 recorded a video 显示了这两种情况(首先,15 个项目工作正常,然后 16 个项目挂起并通过终止一个进程释放)- link 到视频
代码和视频都在this repo
对于在哪里寻找解决方案的任何帮助或建议,我将不胜感激。
事实证明这是 Akka configuration 阻塞 IO 调度程序级别的限制:
因此将该值更改为大于流的数量解决了问题:
akka.actor.default-blocking-io-dispatcher.thread-pool-executor.fixed-pool-size = 50
这是一个有趣的问题,看起来流是 dead-locking。增加线程可能是治标不治本。
问题出在下面的代码
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn),
StreamConverters.fromInputStream(() => pipeOut)
)
fromInputStream
和 fromOutputStream
都将使用与您正确注意到的相同的 default-blocking-io-dispatcher
。使用专用线程池的原因是两者都执行阻塞 运行ning 线程的 Java API 调用。
这是 fromInputStream
的线程堆栈跟踪的一部分,显示阻塞发生的位置。
at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon.onPull(InputStreamSource.scala:63)
现在,您运行宁 16
个同步 Sink
连接到单个 Source
。为了支持back-pressure,一个Source
只会在所有 Sink
发送一个pull
命令时产生一个元素。
接下来发生的事情是,您同时调用了 16 个方法 FileInputStream.readBytes
,它们立即阻塞了 default-blocking-io-dispatcher
的所有线程。并且 fromOutputStream
没有线程可以从 Source
写入任何数据或执行任何类型的工作。因此,你有一个 dead-lock.
如果增加池中的线程数,问题可以解决。但这只是消除了症状。
正确的解决方法是运行fromOutputStream
和fromInputStream
在两个独立的线程池中。以下是您的操作方法。
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn).async("blocking-1"),
StreamConverters.fromInputStream(() => pipeOut).async("blocking-2")
)
配置如下
blocking-1 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
blocking-2 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
因为他们不再共享池,所以 fromOutputStream
和 fromInputStream
都可以独立执行任务。
另请注意,我刚刚为每个池分配了 2
个线程,以表明这与线程数无关,而是与池分离有关。
我希望这有助于更好地理解 akka 流。
我正在构建具有以下流程的应用程序:
- 有要处理的项目来源
- 每个项目都应由外部命令处理(最后将是
ffmpeg
,但对于这个简单的可重现用例,它只是cat
让数据通过它) - 最后,这样的外部命令的输出被保存在某个地方(同样,为了这个例子,它只是将它保存到本地文本文件)
所以我在做以下操作:
- 准备一个包含项目的源
- 制作一个 Akka 图形,使用广播将源项目扇出到单独的流中
- 单独的流程使用
ProcessBuilder
结合Flow.fromSinkAndSource
构建外部流程执行的流程 - 使用将数据保存到文件的接收器结束单个流。
完整代码示例:
import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
object MyApp extends App {
// When this is changed to something above 15, the graph just stops
val PROCESSES_COUNT = Integer.parseInt(args(0))
println(s"Running with ${PROCESSES_COUNT} processes...")
implicit val system = ActorSystem("MyApp")
implicit val globalContext: ExecutionContext = ExecutionContext.global
def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
val convertProcess = new ProcessBuilder(cmd).start
val pipeIn = new BufferedOutputStream(convertProcess.getOutputStream)
val pipeOut = new BufferedInputStream(convertProcess.getInputStream)
Flow
.fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
}
val source = Source(1 to 100)
.map(element => {
println(s"--emit: ${element}")
ByteString(element)
})
val sinksList = (1 to PROCESSES_COUNT).map(i => {
Flow[ByteString]
.via(executeCmdOnStream("cat"))
.toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
})
val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>
val broadcast = builder.add(Broadcast[ByteString](sinks.size))
source ~> broadcast.in
for (i <- broadcast.outlets.indices) {
broadcast.out(i) ~> sinks(i)
}
ClosedShape
}
Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)
}
运行 使用以下命令:
sbt "run PROCESSES_COUNT"
即
sbt "run 15"
在我提高“外部进程”的数量(代码中的PROCESSES_COUNT)之前,这一切都运行良好。当它是 15 或更少时,一切顺利,但当它是 16 或更多 时,会发生以下情况:
- 发出前 16 个项目后整个执行挂起(这 16 个项目是 Akka 的默认缓冲区大小 AFAIK)
- 我可以看到系统中启动了
cat
个进程(全部 16 个进程) - 当我在系统中手动杀死这些
cat
进程之一时,一些东西被释放并继续处理(当然结果是一个文件是空的,因为我杀死了它的处理命令)
我检查了这是肯定是由外部执行引起的(不是即 Akka Broadcast 本身的限制)。
我 recorded a video 显示了这两种情况(首先,15 个项目工作正常,然后 16 个项目挂起并通过终止一个进程释放)- link 到视频
代码和视频都在this repo
对于在哪里寻找解决方案的任何帮助或建议,我将不胜感激。
事实证明这是 Akka configuration 阻塞 IO 调度程序级别的限制:
因此将该值更改为大于流的数量解决了问题:
akka.actor.default-blocking-io-dispatcher.thread-pool-executor.fixed-pool-size = 50
这是一个有趣的问题,看起来流是 dead-locking。增加线程可能是治标不治本。
问题出在下面的代码
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn),
StreamConverters.fromInputStream(() => pipeOut)
)
fromInputStream
和 fromOutputStream
都将使用与您正确注意到的相同的 default-blocking-io-dispatcher
。使用专用线程池的原因是两者都执行阻塞 运行ning 线程的 Java API 调用。
这是 fromInputStream
的线程堆栈跟踪的一部分,显示阻塞发生的位置。
at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon.onPull(InputStreamSource.scala:63)
现在,您运行宁 16
个同步 Sink
连接到单个 Source
。为了支持back-pressure,一个Source
只会在所有 Sink
发送一个pull
命令时产生一个元素。
接下来发生的事情是,您同时调用了 16 个方法 FileInputStream.readBytes
,它们立即阻塞了 default-blocking-io-dispatcher
的所有线程。并且 fromOutputStream
没有线程可以从 Source
写入任何数据或执行任何类型的工作。因此,你有一个 dead-lock.
如果增加池中的线程数,问题可以解决。但这只是消除了症状。
正确的解决方法是运行fromOutputStream
和fromInputStream
在两个独立的线程池中。以下是您的操作方法。
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn).async("blocking-1"),
StreamConverters.fromInputStream(() => pipeOut).async("blocking-2")
)
配置如下
blocking-1 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
blocking-2 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
因为他们不再共享池,所以 fromOutputStream
和 fromInputStream
都可以独立执行任务。
另请注意,我刚刚为每个池分配了 2
个线程,以表明这与线程数无关,而是与池分离有关。
我希望这有助于更好地理解 akka 流。