Akka.NET 在 CPU 处于高压状态时持续丢弃消息?
Akka.NET with persistence dropping messages when CPU in under high pressure?
我对我的 PoC 进行了一些性能测试。我看到的是我的演员没有收到发送给他的所有消息,性能非常低。我向我的应用程序发送了大约 15 万条消息,这导致我的处理器达到 100% 利用率的峰值。但是当我停止发送请求时,2/3 的消息没有传递给演员。以下是来自应用洞察的简单指标:
为了证明我在 mongo 中持续存在的事件数量几乎与我的演员收到的消息数量相同。
其次,处理消息的性能非常令人失望。我每秒收到大约 300 条消息。
我知道 Akka.NET 默认情况下消息传递最多一次,但我没有收到任何错误消息说该消息已被丢弃。
代码如下:
集群分片注册:
services.AddSingleton<ValueCoordinatorProvider>(provider =>
{
var shardRegion = ClusterSharding.Get(_actorSystem).Start(
typeName: "values-actor",
entityProps: _actorSystem.DI().Props<ValueActor>(),
settings: ClusterShardingSettings.Create(_actorSystem),
messageExtractor: new ValueShardMsgRouter());
return () => shardRegion;
});
控制器:
[ApiController]
[Route("api/[controller]")]
public class ValueController : ControllerBase
{
private readonly IActorRef _valueCoordinator;
public ValueController(ValueCoordinatorProvider valueCoordinatorProvider)
{
_valueCoordinator = valuenCoordinatorProvider();
}
[HttpPost]
public Task<IActionResult> PostAsync(Message message)
{
_valueCoordinator.Tell(message);
return Task.FromResult((IActionResult)Ok());
}
}
演员:
public class ValueActor : ReceivePersistentActor
{
public override string PersistenceId { get; }
private decimal _currentValue;
public ValueActor()
{
PersistenceId = Context.Self.Path.Name;
Command<Message>(Handle);
}
private void Handle(Message message)
{
Context.IncrementMessagesReceived();
var accepted = new ValueAccepted(message.ValueId, message.Value);
Persist(accepted, valueAccepted =>
{
_currentValue = valueAccepted.BidValue;
});
}
}
消息路由器。
public sealed class ValueShardMsgRouter : HashCodeMessageExtractor
{
public const int DefaultShardCount = 1_000_000_000;
public ValueShardMsgRouter() : this(DefaultShardCount)
{
}
public ValueShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string EntityId(object message)
{
return message switch
{
IWithValueId valueMsg => valueMsg.ValueId,
_ => null
};
}
}
akka.conf
akka {
stdout-loglevel = ERROR
loglevel = ERROR
actor {
debug {
unhandled = on
}
provider = cluster
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
deployment {
/valuesRouter {
router = consistent-hashing-group
routees.paths = ["/values"]
cluster {
enabled = on
}
}
}
}
remote {
dot-netty.tcp {
hostname = "desktop-j45ou76"
port = 5054
}
}
cluster {
seed-nodes = ["akka.tcp://valuessystem@desktop-j45ou76:5054"]
}
persistence {
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "mongodb://localhost:27017/akkanet"
auto-initialize = off
plugin-dispatcher = "akka.actor.default-dispatcher"
collection = "EventJournal"
metadata-collection = "Metadata"
legacy-serialization = off
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "mongodb://localhost:27017/akkanet"
auto-initialize = off
plugin-dispatcher = "akka.actor.default-dispatcher"
collection = "SnapshotStore"
legacy-serialization = off
}
}
}
}
所以这里有两个问题:演员表现和丢失的消息。
从你的文章中看不清楚,但我会做一个假设:这些消息中的 100% 都发给了一个演员。
演员表演
单个参与者的端到端吞吐量取决于:
- 将消息路由到参与者所需的工作量(即通过分片系统、层次结构、网络等)
- 演员处理单个消息所花费的时间,因为这决定了邮箱清空的速度;和
- 任何影响哪些消息可以在何时被处理的流控制——即如果一个参与者使用隐藏和行为切换,一个参与者在等待其状态改变时花费在隐藏消息上的时间量将对所有隐藏消息的端到端处理时间。
由于此列表中的第 3 项,您的表现会很差。您正在实施的设计调用 Persist
和 阻止参与者进行任何额外的处理,直到消息被成功保存 。发送给参与者的所有其他消息都存储在内部,直到前一条消息成功保存。
Akka.Persistence 从单个参与者的角度提供四种持久化消息的选项:
Persist
- 最高一致性(在确认持久性之前不能处理其他消息),最低性能;
PersistAsync
- 一致性较低,性能更高。在处理邮箱中的下一条消息之前,不等待消息被持久化。允许来自单个持久性参与者的多条消息在运行中同时处理——这些事件被持久化的顺序将被保留(因为它们被发送到内部 Akka.Persistence 日志 IActorRef
在那个顺序) 但 actor 将在确认持久化消息之前继续处理其他消息。这意味着您可能必须在调用 PersistAsync
之前 修改 actor 的内存状态 ,而不是在调用 之后。
PersistAll
- 高一致性,但一次批处理多个持久事件。与 Persist
相同的排序和控制流语义 - 但您只是将一组消息保存在一起。
PersistAllAsync
- 最高性能。与 PersistAsync
相同的语义,但它是一个数组中的一批消息被持久化在一起。
要了解 Akka.Persistence 的性能特征如何随着这些方法中的每一种而变化,请查看 Akka.NET 组织围绕 [=113] 收集的详细基准数据=].Linq2Db,新的高性能 RDBMS Akka.Persistence 库:https://github.com/akkadotnet/Akka.Persistence.Linq2Db#performance - 在 SQL 上每秒 15,000 次和每秒 250 次之间存在差异;在 MongoDB.
这样的系统中,写入性能可能更高
Akka.Persistence 的一个关键属性是它 有意 通过一组集中的“日志”和“快照”参与者路由所有持久性命令集群中的每个节点 - 因此来自多个持久性参与者的消息可以跨少量并发数据库连接一起批处理。有许多用户 运行 同时存在数十万个持久性参与者——如果每个参与者都有自己与数据库的独特连接,那么即使是地球上最强大的垂直扩展数据库实例也会融化。这种连接池/共享是各个持久性参与者依赖流量控制的原因。
使用任何持久性参与者框架(即 Orleans、Service Fabric),您会看到类似的性能,因为出于相同的原因,它们都采用了类似的设计 Akka.NET。
为了提高性能,您需要将接收到的消息一起批处理并使用 PersistAll
(将其视为 de-bouncing)将它们保存在一个组中,或者使用异步持久性语义使用 PersistAsync
.
如果您将工作负载分散到许多具有不同实体 ID 的并发参与者,您还会看到更好的总体性能 - 这样您就可以从参与者并发性和并行性中获益。
丢失的消息
发生这种情况的原因可能有多种 - 通常是以下原因:
- Actor 被终止(与重新启动不同)并将其所有消息转储到
DeadLetter
集合中;
- 网络中断导致连接中断 - 当节点处于 100% 时可能会发生这种情况 CPU - 当时排队等待传递的消息可能会被丢弃;和
- Akka.Persistence 日志从数据库接收超时将导致持久性参与者由于失去一致性而终止。
您应该在日志中查找以下内容:
DeadLetter
个警告/计数
OpenCircuitBreakerException
s 来自 Akka.Persistence
您通常会看到这两者同时出现 - 我怀疑这就是您的系统发生的情况。另一种可能性是 Akka.Remote 抛出 DisassociationException
s,我也会寻找它。
您可以通过在配置 https://getakka.net/articles/configuration/akka.cluster.html:
中更改 Akka.Cluster failure-detector
的心跳值来修复 Akka.Remote 问题
akka.cluster.failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a public constructor with a com.typesafe.config.Config and
# akka.actor.EventStream parameter.
implementation-class = "Akka.Remote.PhiAccrualFailureDetector, Akka.Remote"
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 1 s
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes.
threshold = 8.0
# Number of the samples of inter-heartbeat arrival times to adaptively
# calculate the failure timeout for connections.
max-sample-size = 1000
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
# inter arrival times.
min-std-deviation = 100 ms
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 3 s
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 9
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 1 s
}
如果需要,将 acceptable-heartbeat-pause = 3 s
值增加到更大的值,例如 10、20、30。
分片配置
我想用您的代码指出的最后一件事 - 分片数太高了。每个节点应该有大约 10 个分片。减少到合理的程度。
我对我的 PoC 进行了一些性能测试。我看到的是我的演员没有收到发送给他的所有消息,性能非常低。我向我的应用程序发送了大约 15 万条消息,这导致我的处理器达到 100% 利用率的峰值。但是当我停止发送请求时,2/3 的消息没有传递给演员。以下是来自应用洞察的简单指标:
为了证明我在 mongo 中持续存在的事件数量几乎与我的演员收到的消息数量相同。
其次,处理消息的性能非常令人失望。我每秒收到大约 300 条消息。
我知道 Akka.NET 默认情况下消息传递最多一次,但我没有收到任何错误消息说该消息已被丢弃。
代码如下: 集群分片注册:
services.AddSingleton<ValueCoordinatorProvider>(provider =>
{
var shardRegion = ClusterSharding.Get(_actorSystem).Start(
typeName: "values-actor",
entityProps: _actorSystem.DI().Props<ValueActor>(),
settings: ClusterShardingSettings.Create(_actorSystem),
messageExtractor: new ValueShardMsgRouter());
return () => shardRegion;
});
控制器:
[ApiController]
[Route("api/[controller]")]
public class ValueController : ControllerBase
{
private readonly IActorRef _valueCoordinator;
public ValueController(ValueCoordinatorProvider valueCoordinatorProvider)
{
_valueCoordinator = valuenCoordinatorProvider();
}
[HttpPost]
public Task<IActionResult> PostAsync(Message message)
{
_valueCoordinator.Tell(message);
return Task.FromResult((IActionResult)Ok());
}
}
演员:
public class ValueActor : ReceivePersistentActor
{
public override string PersistenceId { get; }
private decimal _currentValue;
public ValueActor()
{
PersistenceId = Context.Self.Path.Name;
Command<Message>(Handle);
}
private void Handle(Message message)
{
Context.IncrementMessagesReceived();
var accepted = new ValueAccepted(message.ValueId, message.Value);
Persist(accepted, valueAccepted =>
{
_currentValue = valueAccepted.BidValue;
});
}
}
消息路由器。
public sealed class ValueShardMsgRouter : HashCodeMessageExtractor
{
public const int DefaultShardCount = 1_000_000_000;
public ValueShardMsgRouter() : this(DefaultShardCount)
{
}
public ValueShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string EntityId(object message)
{
return message switch
{
IWithValueId valueMsg => valueMsg.ValueId,
_ => null
};
}
}
akka.conf
akka {
stdout-loglevel = ERROR
loglevel = ERROR
actor {
debug {
unhandled = on
}
provider = cluster
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
deployment {
/valuesRouter {
router = consistent-hashing-group
routees.paths = ["/values"]
cluster {
enabled = on
}
}
}
}
remote {
dot-netty.tcp {
hostname = "desktop-j45ou76"
port = 5054
}
}
cluster {
seed-nodes = ["akka.tcp://valuessystem@desktop-j45ou76:5054"]
}
persistence {
journal {
plugin = "akka.persistence.journal.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"
connection-string = "mongodb://localhost:27017/akkanet"
auto-initialize = off
plugin-dispatcher = "akka.actor.default-dispatcher"
collection = "EventJournal"
metadata-collection = "Metadata"
legacy-serialization = off
}
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.mongodb"
mongodb {
class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
connection-string = "mongodb://localhost:27017/akkanet"
auto-initialize = off
plugin-dispatcher = "akka.actor.default-dispatcher"
collection = "SnapshotStore"
legacy-serialization = off
}
}
}
}
所以这里有两个问题:演员表现和丢失的消息。
从你的文章中看不清楚,但我会做一个假设:这些消息中的 100% 都发给了一个演员。
演员表演
单个参与者的端到端吞吐量取决于:
- 将消息路由到参与者所需的工作量(即通过分片系统、层次结构、网络等)
- 演员处理单个消息所花费的时间,因为这决定了邮箱清空的速度;和
- 任何影响哪些消息可以在何时被处理的流控制——即如果一个参与者使用隐藏和行为切换,一个参与者在等待其状态改变时花费在隐藏消息上的时间量将对所有隐藏消息的端到端处理时间。
由于此列表中的第 3 项,您的表现会很差。您正在实施的设计调用 Persist
和 阻止参与者进行任何额外的处理,直到消息被成功保存 。发送给参与者的所有其他消息都存储在内部,直到前一条消息成功保存。
Akka.Persistence 从单个参与者的角度提供四种持久化消息的选项:
Persist
- 最高一致性(在确认持久性之前不能处理其他消息),最低性能;PersistAsync
- 一致性较低,性能更高。在处理邮箱中的下一条消息之前,不等待消息被持久化。允许来自单个持久性参与者的多条消息在运行中同时处理——这些事件被持久化的顺序将被保留(因为它们被发送到内部 Akka.Persistence 日志IActorRef
在那个顺序) 但 actor 将在确认持久化消息之前继续处理其他消息。这意味着您可能必须在调用PersistAsync
之前 修改 actor 的内存状态 ,而不是在调用 之后。
PersistAll
- 高一致性,但一次批处理多个持久事件。与Persist
相同的排序和控制流语义 - 但您只是将一组消息保存在一起。PersistAllAsync
- 最高性能。与PersistAsync
相同的语义,但它是一个数组中的一批消息被持久化在一起。
要了解 Akka.Persistence 的性能特征如何随着这些方法中的每一种而变化,请查看 Akka.NET 组织围绕 [=113] 收集的详细基准数据=].Linq2Db,新的高性能 RDBMS Akka.Persistence 库:https://github.com/akkadotnet/Akka.Persistence.Linq2Db#performance - 在 SQL 上每秒 15,000 次和每秒 250 次之间存在差异;在 MongoDB.
这样的系统中,写入性能可能更高Akka.Persistence 的一个关键属性是它 有意 通过一组集中的“日志”和“快照”参与者路由所有持久性命令集群中的每个节点 - 因此来自多个持久性参与者的消息可以跨少量并发数据库连接一起批处理。有许多用户 运行 同时存在数十万个持久性参与者——如果每个参与者都有自己与数据库的独特连接,那么即使是地球上最强大的垂直扩展数据库实例也会融化。这种连接池/共享是各个持久性参与者依赖流量控制的原因。
使用任何持久性参与者框架(即 Orleans、Service Fabric),您会看到类似的性能,因为出于相同的原因,它们都采用了类似的设计 Akka.NET。
为了提高性能,您需要将接收到的消息一起批处理并使用 PersistAll
(将其视为 de-bouncing)将它们保存在一个组中,或者使用异步持久性语义使用 PersistAsync
.
如果您将工作负载分散到许多具有不同实体 ID 的并发参与者,您还会看到更好的总体性能 - 这样您就可以从参与者并发性和并行性中获益。
丢失的消息
发生这种情况的原因可能有多种 - 通常是以下原因:
- Actor 被终止(与重新启动不同)并将其所有消息转储到
DeadLetter
集合中; - 网络中断导致连接中断 - 当节点处于 100% 时可能会发生这种情况 CPU - 当时排队等待传递的消息可能会被丢弃;和
- Akka.Persistence 日志从数据库接收超时将导致持久性参与者由于失去一致性而终止。
您应该在日志中查找以下内容:
DeadLetter
个警告/计数OpenCircuitBreakerException
s 来自 Akka.Persistence
您通常会看到这两者同时出现 - 我怀疑这就是您的系统发生的情况。另一种可能性是 Akka.Remote 抛出 DisassociationException
s,我也会寻找它。
您可以通过在配置 https://getakka.net/articles/configuration/akka.cluster.html:
中更改 Akka.Clusterfailure-detector
的心跳值来修复 Akka.Remote 问题
akka.cluster.failure-detector {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a public constructor with a com.typesafe.config.Config and
# akka.actor.EventStream parameter.
implementation-class = "Akka.Remote.PhiAccrualFailureDetector, Akka.Remote"
# How often keep-alive heartbeat messages should be sent to each connection.
heartbeat-interval = 1 s
# Defines the failure detector threshold.
# A low threshold is prone to generate many wrong suspicions but ensures
# a quick detection in the event of a real crash. Conversely, a high
# threshold generates fewer mistakes but needs more time to detect
# actual crashes.
threshold = 8.0
# Number of the samples of inter-heartbeat arrival times to adaptively
# calculate the failure timeout for connections.
max-sample-size = 1000
# Minimum standard deviation to use for the normal distribution in
# AccrualFailureDetector. Too low standard deviation might result in
# too much sensitivity for sudden, but normal, deviations in heartbeat
# inter arrival times.
min-std-deviation = 100 ms
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly.
# This margin is important to be able to survive sudden, occasional,
# pauses in heartbeat arrivals, due to for example garbage collect or
# network drop.
acceptable-heartbeat-pause = 3 s
# Number of member nodes that each member will send heartbeat messages to,
# i.e. each node will be monitored by this number of other nodes.
monitored-by-nr-of-members = 9
# After the heartbeat request has been sent the first failure detection
# will start after this period, even though no heartbeat mesage has
# been received.
expected-response-after = 1 s
}
如果需要,将 acceptable-heartbeat-pause = 3 s
值增加到更大的值,例如 10、20、30。
分片配置
我想用您的代码指出的最后一件事 - 分片数太高了。每个节点应该有大约 10 个分片。减少到合理的程度。