我怎样才能防止我的 Akka.net actor 阻塞我的整个 actor 系统?
How can I prevent my Akka.net actor from blocking my entire actor system?
我有一个使用 6 个演员的 C# Akka.net 项目。一个参与者 (LoggingActor) 负责获取对象并将它们传输到 Web 服务器。
每当 LoggingActor 遇到网络故障时,它似乎会成为我整个 actor 系统的瓶颈;一切都会停止,直到 LoggingActor 完成其方法。
LoggingActor 中的阻塞方法包含:
try
{
var rs = new RemoteServer();
var rsr = rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.Result;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
SendMovement 函数如下所示:
public async Task<HttpStatusCode> SendMovement(Movement movement, string siteToken, string serverUrl)
{
//Get remote server
var createMovementUrl = serverUrl + MovementsRoute + "?site_token=" + siteToken;
var jsonString = "{ \"movement\": { \"approach\": \"" + movement.Approach + "\", \"exit\": \"" + movement.Exit + "\", \"turntype\": \"" + movement.TurnType + "\", \"objectclass\": \"" + movement.TrafficObjectType + "\", \"created_at\": " + movement.Timestamp.Ticks + "} }";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
//Transmit
var response = await Client.PostAsync(createMovementUrl, content);
return response.StatusCode;
}
LoggingActor 的目的是尝试 POST 我的对象到服务器,并缓冲它们以便在初始尝试失败时稍后发送。对于我的应用程序,它们的传输顺序无关紧要。
如何修改 LoggingActor 或 SendMovement 函数以允许 LoggingActor 'fall through' LoggingActor 的消息处理函数 通过在之前发送 POST 请求等待回复?
我想我想要的是服务器的 POST 响应(或异常)触发另一条消息给演员,告诉演员要么从发送队列中删除它,要么保留它(即什么也不做)。
您正在调用 rs.SendMovement
,其中 return 是一个任务,您正在对其调用 Result
属性。这将锁定当前线程池中的整个线程 - 因为线程池在所有 actors/tasks 之间共享,并且通常每个 CPU 核心包含 1 个工作线程。这意味着,您已经有效地关闭了整个 CPU 核心,直到远程端响应(或超时)。
一般来说,在使用异步代码时永远不要阻塞线程。在 Akka.NET 中,使用基于任务的 API 可以通过两种方式中的一种来完成。
使用ReceiveAsync
第一个解决方案是让您的 actor 继承自 ReceiveActor
并使用 ReceiveAsync
方法来定义您的消息处理程序:
class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAsync<MyMessage>(async message => {
try
{
var rs = new RemoteServer();
var rsr = await rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl);
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
});
}
}
这将创建一个称为 不可重入 actor 的东西 - 这意味着,虽然您不会阻塞任何底层线程(以便它仍然可以被其他 actor 使用同时工作),您将阻止当前 actor,阻止它接收任何其他消息,直到当前异步 lambda 处理程序到达终点。
使用PipeTo
另一种方法——默认情况下在几乎所有 akka 内部架构中使用——是在任务上使用 PipeTo
方法:
class MyActor : ActorBase
{
// start with an actor in ready state
protected override bool Receive(object message) => Ready(message);
bool Ready(object message)
{
switch (message)
{
case MyMessage msg:
var rs = new RemoteServer();
rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.PipeTo(Self,
success: rsr => new Status.Success(rsr),
failure: ex => new Status.Failure(ex));
Become(Waiting);
return true;
default: return false;
}
}
bool Waiting(object message)
{
switch (message)
{
case Status.Success success:
var rsr = (HttpStatusCode)success.Status;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
Become(Ready);
return true;
case Status.Failure failure:
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
Become(Ready);
return true;
default: return false;
}
}
}
通过这种方式,您可以将 actor 变成自定义状态机。此处负责等待逻辑的代码片段分为两步——此处表示为 Ready
/Waiting
状态。发送异步请求后,actor 正在改变其行为——这意味着他现在可以处理不同的传入消息集或对它们做出不同的反应。返回的布尔值通知参与者系统消息是否被当前参与者处理 - 这可能会触发未处理的调用,默认情况下将记录未处理的消息。
这种方法的优点之一是这个 actor 是 可重入的 - 这意味着,在等待 SendMovement
到 return 结果的同时(或失败),此 actor 可以自由接收和处理其他消息。
我有一个使用 6 个演员的 C# Akka.net 项目。一个参与者 (LoggingActor) 负责获取对象并将它们传输到 Web 服务器。
每当 LoggingActor 遇到网络故障时,它似乎会成为我整个 actor 系统的瓶颈;一切都会停止,直到 LoggingActor 完成其方法。
LoggingActor 中的阻塞方法包含:
try
{
var rs = new RemoteServer();
var rsr = rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.Result;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
SendMovement 函数如下所示:
public async Task<HttpStatusCode> SendMovement(Movement movement, string siteToken, string serverUrl)
{
//Get remote server
var createMovementUrl = serverUrl + MovementsRoute + "?site_token=" + siteToken;
var jsonString = "{ \"movement\": { \"approach\": \"" + movement.Approach + "\", \"exit\": \"" + movement.Exit + "\", \"turntype\": \"" + movement.TurnType + "\", \"objectclass\": \"" + movement.TrafficObjectType + "\", \"created_at\": " + movement.Timestamp.Ticks + "} }";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
//Transmit
var response = await Client.PostAsync(createMovementUrl, content);
return response.StatusCode;
}
LoggingActor 的目的是尝试 POST 我的对象到服务器,并缓冲它们以便在初始尝试失败时稍后发送。对于我的应用程序,它们的传输顺序无关紧要。
如何修改 LoggingActor 或 SendMovement 函数以允许 LoggingActor 'fall through' LoggingActor 的消息处理函数 通过在之前发送 POST 请求等待回复?
我想我想要的是服务器的 POST 响应(或异常)触发另一条消息给演员,告诉演员要么从发送队列中删除它,要么保留它(即什么也不做)。
您正在调用 rs.SendMovement
,其中 return 是一个任务,您正在对其调用 Result
属性。这将锁定当前线程池中的整个线程 - 因为线程池在所有 actors/tasks 之间共享,并且通常每个 CPU 核心包含 1 个工作线程。这意味着,您已经有效地关闭了整个 CPU 核心,直到远程端响应(或超时)。
一般来说,在使用异步代码时永远不要阻塞线程。在 Akka.NET 中,使用基于任务的 API 可以通过两种方式中的一种来完成。
使用ReceiveAsync
第一个解决方案是让您的 actor 继承自 ReceiveActor
并使用 ReceiveAsync
方法来定义您的消息处理程序:
class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAsync<MyMessage>(async message => {
try
{
var rs = new RemoteServer();
var rsr = await rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl);
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
}
catch (HttpRequestException httpEx)
{
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
}
});
}
}
这将创建一个称为 不可重入 actor 的东西 - 这意味着,虽然您不会阻塞任何底层线程(以便它仍然可以被其他 actor 使用同时工作),您将阻止当前 actor,阻止它接收任何其他消息,直到当前异步 lambda 处理程序到达终点。
使用PipeTo
另一种方法——默认情况下在几乎所有 akka 内部架构中使用——是在任务上使用 PipeTo
方法:
class MyActor : ActorBase
{
// start with an actor in ready state
protected override bool Receive(object message) => Ready(message);
bool Ready(object message)
{
switch (message)
{
case MyMessage msg:
var rs = new RemoteServer();
rs.SendMovement(editedMovement, _regionConfig.SiteToken, _userConfig.ServerUrl)
.PipeTo(Self,
success: rsr => new Status.Success(rsr),
failure: ex => new Status.Failure(ex));
Become(Waiting);
return true;
default: return false;
}
}
bool Waiting(object message)
{
switch (message)
{
case Status.Success success:
var rsr = (HttpStatusCode)success.Status;
if (rsr != HttpStatusCode.OK)
{
Log("Movement POST failed:" + rsr, LogLevel.Error, "LoggingActor");
}
Become(Ready);
return true;
case Status.Failure failure:
Log("Buffering movement for re-send", LogLevel.Warn, "LoggingActor");
MovementTransmitBuffer.Add(editedMovement);
Become(Ready);
return true;
default: return false;
}
}
}
通过这种方式,您可以将 actor 变成自定义状态机。此处负责等待逻辑的代码片段分为两步——此处表示为 Ready
/Waiting
状态。发送异步请求后,actor 正在改变其行为——这意味着他现在可以处理不同的传入消息集或对它们做出不同的反应。返回的布尔值通知参与者系统消息是否被当前参与者处理 - 这可能会触发未处理的调用,默认情况下将记录未处理的消息。
这种方法的优点之一是这个 actor 是 可重入的 - 这意味着,在等待 SendMovement
到 return 结果的同时(或失败),此 actor 可以自由接收和处理其他消息。