通过 Akka 应用程序中的上下文切换,CPU 使用率很高
High CPU usage through context switches in Akka application
我正在维护和开发两个与串行设备连接以收集传感器信息的 Akka Scala 应用程序。两者之间的主要区别在于,一个(我的 CO2 传感器应用程序)使用 1% CPU,而另一个(我的功率传感器应用程序)使用 250% CPU。在 Linux 机器 (Raspberry Pi 3) 和我的 Windows 台式电脑上都是这种情况。在代码方面,主要区别在于 CO2 直接使用串行库 (http://fazecast.github.io/jSerialComm/),而功率传感器应用程序通过一层中间件将串行库的 In/OutputStreams 转换为 Akka Source/Sink 这样的:
val port = SerialPort.getCommPort(comPort)
port.setBaudRate(baudRate)
port.setFlowControl(flowControl)
port.setComPortParameters(baudRate, dataBits, stopBits, parity)
port.setComPortTimeouts(timeoutMode, timeout, timeout)
val isOpen = port.openPort()
if(!isOpen) {
error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting")
throw new Exception("Port could not be opened")
}
(reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream))
当我看到如此高的 CPU 使用率时,我立即对它使用了 Profiler (VisualVM),它告诉我以下内容:
在谷歌搜索 Unsafe.park 后,我找到了以下答案: - 使用此信息,我检查了使用和不使用我的功率传感器应用程序的上下文切换量,结果非常清楚问题的根本原因:
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
10 0 32692 80144 71228 264356 0 0 0 5 7 8 38 5 55 2 0
1 0 32692 80176 71228 264356 0 0 0 76 12932 18856 59 6 35 0 0
1 0 32692 80208 71228 264356 0 0 0 0 14111 20570 60 8 32 0 0
1 0 32692 80208 71228 264356 0 0 0 0 13186 16095 65 6 29 0 0
1 0 32692 80176 71228 264356 0 0 0 0 14008 23449 56 6 38 0 0
3 0 32692 80208 71228 264356 0 0 0 0 13528 17783 65 6 29 0 0
1 0 32692 80208 71228 264356 0 0 0 28 12960 16588 63 6 31 0 0
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 32692 147320 71228 264332 0 0 0 5 7 8 38 5 55 2 0
0 0 32692 147296 71228 264332 0 0 0 84 963 1366 0 0 98 2 0
0 0 32692 147296 71228 264332 0 0 0 0 962 1347 1 0 99 0 0
0 0 32692 147296 71228 264332 0 0 0 0 947 1318 1 0 99 0 0
如您所见,仅通过终止我的应用程序,上下文切换的数量就每秒减少了约 12000 次。我继续检查具体是哪些线程在执行此操作,看来 Akka 真的很想做一些事情:
这里和另一个 SO 问题上的评论都指向调整 Akka 的并行设置。我将以下内容添加到我的 application.conf - 没有结果。
akka {
log-config-on-start = "on"
actor{
default-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
default-executor {
fallback = "fork-join-executor"
}
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 1
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
}
stream{
default-blocking-io-dispatcher {
type = PinnedDispatcher
executor = "fork-join-executor"
throughput = 1
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-factor = 1.0
core-pool-size-max = 1
}
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
}
}
}
这似乎提高了 CPU 使用率(100% -> 65%),但 CPU 使用率仍然过高。
更新 21-11-'16
看来问题出在我的图表中。如果不是 运行 图表,CPU 使用率会立即下降到正常水平。图表如下:
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val responsePacketSource = serialSource
.via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true))
.via(cleanPacket)
.via(printOutput("Received: ",debug(_)))
.via(byteStringToResponse)
val packetSink = pushSource
.via(throttle(throttle))
val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket])
val broadcastRequest = builder.add(Broadcast[RequestPacket](2))
val broadcastResponse = builder.add(Broadcast[ResponsePacket](2))
packetSink ~> broadcastRequest.in
broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink
broadcastRequest.out(1) ~> zipRequestStickResponse.in0
responsePacketSource ~> broadcastResponse.in
broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1
broadcastResponse.out(1).filter(!isStickAck(_)).map (al => {
val e = completeRequest(al)
debug(s"Sinking: $e")
e
}) ~> Sink.ignore
zipRequestStickResponse.out.map { case(request, stickResponse) =>
debug(s"Mapping: request=$request, stickResponse=$stickResponse")
pendingPackets += stickResponse.sequenceNumber -> request
request.stickResponse trySuccess stickResponse
} ~> Sink.ignore
ClosedShape
})
streamGraph.run()
从 broadcastResponse 中删除过滤器后,CPU 使用率下降到正常水平。这让我相信 zip 永远不会发生,因此,图表会进入不正确的状态。
问题是 Fazecast 的 jSerialComm 库有许多不同的超时模式。
static final public int TIMEOUT_NONBLOCKING = 0x00000000;
static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001;
static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010;
static final public int TIMEOUT_READ_BLOCKING = 0x00000100;
static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000;
static final public int TIMEOUT_SCANNER = 0x00010000;
使用非阻塞 read()
方法 (TIMEOUT_NONBLOCKING
) 与 Akka Stream 的 InputStreamPublisher
结合使用会产生非常高的 CPU 使用率。要防止这种情况,只需使用 TIMEOUT_READ_SEMI_BLOCKING
或 TIMEOUT_READ_BLOCKING
。
我正在维护和开发两个与串行设备连接以收集传感器信息的 Akka Scala 应用程序。两者之间的主要区别在于,一个(我的 CO2 传感器应用程序)使用 1% CPU,而另一个(我的功率传感器应用程序)使用 250% CPU。在 Linux 机器 (Raspberry Pi 3) 和我的 Windows 台式电脑上都是这种情况。在代码方面,主要区别在于 CO2 直接使用串行库 (http://fazecast.github.io/jSerialComm/),而功率传感器应用程序通过一层中间件将串行库的 In/OutputStreams 转换为 Akka Source/Sink 这样的:
val port = SerialPort.getCommPort(comPort)
port.setBaudRate(baudRate)
port.setFlowControl(flowControl)
port.setComPortParameters(baudRate, dataBits, stopBits, parity)
port.setComPortTimeouts(timeoutMode, timeout, timeout)
val isOpen = port.openPort()
if(!isOpen) {
error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting")
throw new Exception("Port could not be opened")
}
(reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream))
当我看到如此高的 CPU 使用率时,我立即对它使用了 Profiler (VisualVM),它告诉我以下内容:
在谷歌搜索 Unsafe.park 后,我找到了以下答案: - 使用此信息,我检查了使用和不使用我的功率传感器应用程序的上下文切换量,结果非常清楚问题的根本原因:
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
10 0 32692 80144 71228 264356 0 0 0 5 7 8 38 5 55 2 0
1 0 32692 80176 71228 264356 0 0 0 76 12932 18856 59 6 35 0 0
1 0 32692 80208 71228 264356 0 0 0 0 14111 20570 60 8 32 0 0
1 0 32692 80208 71228 264356 0 0 0 0 13186 16095 65 6 29 0 0
1 0 32692 80176 71228 264356 0 0 0 0 14008 23449 56 6 38 0 0
3 0 32692 80208 71228 264356 0 0 0 0 13528 17783 65 6 29 0 0
1 0 32692 80208 71228 264356 0 0 0 28 12960 16588 63 6 31 0 0
pi@dex:~ $ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
r b swpd free buff cache si so bi bo in cs us sy id wa st
1 0 32692 147320 71228 264332 0 0 0 5 7 8 38 5 55 2 0
0 0 32692 147296 71228 264332 0 0 0 84 963 1366 0 0 98 2 0
0 0 32692 147296 71228 264332 0 0 0 0 962 1347 1 0 99 0 0
0 0 32692 147296 71228 264332 0 0 0 0 947 1318 1 0 99 0 0
如您所见,仅通过终止我的应用程序,上下文切换的数量就每秒减少了约 12000 次。我继续检查具体是哪些线程在执行此操作,看来 Akka 真的很想做一些事情:
这里和另一个 SO 问题上的评论都指向调整 Akka 的并行设置。我将以下内容添加到我的 application.conf - 没有结果。
akka {
log-config-on-start = "on"
actor{
default-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
default-executor {
fallback = "fork-join-executor"
}
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 1
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 1
}
}
stream{
default-blocking-io-dispatcher {
type = PinnedDispatcher
executor = "fork-join-executor"
throughput = 1
thread-pool-executor {
core-pool-size-min = 1
core-pool-size-factor = 1.0
core-pool-size-max = 1
}
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
}
}
}
这似乎提高了 CPU 使用率(100% -> 65%),但 CPU 使用率仍然过高。
更新 21-11-'16 看来问题出在我的图表中。如果不是 运行 图表,CPU 使用率会立即下降到正常水平。图表如下:
val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val responsePacketSource = serialSource
.via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true))
.via(cleanPacket)
.via(printOutput("Received: ",debug(_)))
.via(byteStringToResponse)
val packetSink = pushSource
.via(throttle(throttle))
val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket])
val broadcastRequest = builder.add(Broadcast[RequestPacket](2))
val broadcastResponse = builder.add(Broadcast[ResponsePacket](2))
packetSink ~> broadcastRequest.in
broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink
broadcastRequest.out(1) ~> zipRequestStickResponse.in0
responsePacketSource ~> broadcastResponse.in
broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1
broadcastResponse.out(1).filter(!isStickAck(_)).map (al => {
val e = completeRequest(al)
debug(s"Sinking: $e")
e
}) ~> Sink.ignore
zipRequestStickResponse.out.map { case(request, stickResponse) =>
debug(s"Mapping: request=$request, stickResponse=$stickResponse")
pendingPackets += stickResponse.sequenceNumber -> request
request.stickResponse trySuccess stickResponse
} ~> Sink.ignore
ClosedShape
})
streamGraph.run()
从 broadcastResponse 中删除过滤器后,CPU 使用率下降到正常水平。这让我相信 zip 永远不会发生,因此,图表会进入不正确的状态。
问题是 Fazecast 的 jSerialComm 库有许多不同的超时模式。
static final public int TIMEOUT_NONBLOCKING = 0x00000000;
static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001;
static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010;
static final public int TIMEOUT_READ_BLOCKING = 0x00000100;
static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000;
static final public int TIMEOUT_SCANNER = 0x00010000;
使用非阻塞 read()
方法 (TIMEOUT_NONBLOCKING
) 与 Akka Stream 的 InputStreamPublisher
结合使用会产生非常高的 CPU 使用率。要防止这种情况,只需使用 TIMEOUT_READ_SEMI_BLOCKING
或 TIMEOUT_READ_BLOCKING
。