在 akka.net actor 中使用 async/await 是否安全
Is it safe to use async/await inside akka.net actor
在下面的代码中,我使用了 .net 提供的语法糖,即 async/await 方法,但请注意,这不是在 akka 中处理异步操作的好方法,我宁愿使用管道到()。
public class AggregatorActor : ActorBase, IWithUnboundedStash
{
#region Constructor
public AggregatorActor(IActorSystemSettings settings, IAccountComponent component, LogSettings logSettings) : base(settings, logSettings)
{
_accountComponent = component;
_settings = settings;
}
#endregion
#region Public Methods
public override void Listening()
{
ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessage(x));
ReceiveAsync<ProfilerMessages.TimeElasped>(async x => await HandleMessage(x));
}
public override async Task HandleMessage(object msg)
{
msg.Match().With<ProfilerMessages.GetSummary>(async x =>
{
_sender = Context.Sender;
//Become busy. Stash
Become(Busy);
//Handle different request
await HandleSummaryRequest(x.UserId, x.CasinoId, x.GamingServerId, x.AccountNumber, x.GroupName);
});
msg.Match().With<ProfilerMessages.RecurringCheck>(x =>
{
_logger.Info("Recurring Message");
if (IsAllResponsesReceived())
{
BeginAggregate();
}
});
msg.Match().With<ProfilerMessages.TimeElasped>(x =>
{
_logger.Info("Time Elapsed");
BeginAggregate();
});
}
private async Task HandleSummaryRequest(int userId, int casinoId, int gsid, string accountNumber, string groupName)
{
try
{
var accountMsg = new AccountMessages.GetAggregatedData(userId, accountNumber, casinoId, gsid);
//AskPattern.AskAsync<AccountMessages.AccountResponseAll>(Context.Self, _accountActor, accountMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_accountActor.Tell(accountMsg);
var contactMsg = new ContactMessages.GetAggregatedContactDetails(userId);
//AskPattern.AskAsync<Messages.ContactMessages.ContactResponse>(Context.Self, _contactActor, contactMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_contactActor.Tell(contactMsg);
var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(userId, casinoId, gsid);
//AskPattern.AskAsync<Messages.AnalyticsMessages.AnalyticsResponse>(Context.Self, _analyticsActor, analyticMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_analyticsActor.Tell(analyticMsg);
var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(userId.ToString());
//AskPattern.AskAsync<Messages.FinancialMessages.FinancialResponse>(Context.Self, _financialActor, financialMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_financialActor.Tell(financialMsg);
var verificationMsg = VerificationMessages.GetAggregatedVerification.Instance(groupName, casinoId.ToString(), userId.ToString(), gsid);
_verificationActor.Tell(verificationMsg);
var riskMessage = RiskMessages.GeAggregatedRiskDetails.Instance(userId, accountNumber, groupName, casinoId, gsid);
_riskActor.Tell(riskMessage);
_cancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromMilliseconds(_settings.AggregatorTimeOut), Self, Messages.ProfilerMessages.TimeElasped.Instance(), Self);
_cancelRecurring = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.RecurringResponseCheck, _settings.RecurringResponseCheck, Self, Messages.ProfilerMessages.RecurringCheck.Instance(), Self);
}
catch (Exception ex)
{
ExceptionHandler(ex);
}
}
#endregion
}
如您在示例代码中所见,我正在使用 async/await,并使用 Akka.net 提供的 ReceiveAsync() 方法。
如果我们不能在 actor 中使用 async/await,ReceiveAsync() 的目的是什么?
您可以在 actor 中使用 async/await,但这需要对 suspend/resume actor 的邮箱进行一些必要的编排,直到异步任务完成. 这使得 actor 不可重入,这意味着它不会选择任何新消息,直到当前任务完成。要在 actor 中使用 async/await,您可以:
- 使用可以采用异步处理程序的
ReceiveAsync
。
- 用
ActorTaskScheduler.RunTask
包装您的异步方法调用。这通常在 actor 生命周期方法的上下文中很有用(例如 PreStart
/PostStop
)。
请记住,如果使用默认的 actor 消息调度程序,这将有效,但如果 actor 配置为使用不同类型的调度程序,则不能保证有效。
另外,在 actor 内部使用 async/await 也会带来性能下降,这与 suspend/resume 机制和 actor 缺乏可重入性有关。在许多业务案例中,这并不是真正的问题,但有时可能会成为 high-performance/low-latency 工作流程中的问题。
在下面的代码中,我使用了 .net 提供的语法糖,即 async/await 方法,但请注意,这不是在 akka 中处理异步操作的好方法,我宁愿使用管道到()。
public class AggregatorActor : ActorBase, IWithUnboundedStash
{
#region Constructor
public AggregatorActor(IActorSystemSettings settings, IAccountComponent component, LogSettings logSettings) : base(settings, logSettings)
{
_accountComponent = component;
_settings = settings;
}
#endregion
#region Public Methods
public override void Listening()
{
ReceiveAsync<ProfilerMessages.ProfilerBase>(async x => await HandleMessage(x));
ReceiveAsync<ProfilerMessages.TimeElasped>(async x => await HandleMessage(x));
}
public override async Task HandleMessage(object msg)
{
msg.Match().With<ProfilerMessages.GetSummary>(async x =>
{
_sender = Context.Sender;
//Become busy. Stash
Become(Busy);
//Handle different request
await HandleSummaryRequest(x.UserId, x.CasinoId, x.GamingServerId, x.AccountNumber, x.GroupName);
});
msg.Match().With<ProfilerMessages.RecurringCheck>(x =>
{
_logger.Info("Recurring Message");
if (IsAllResponsesReceived())
{
BeginAggregate();
}
});
msg.Match().With<ProfilerMessages.TimeElasped>(x =>
{
_logger.Info("Time Elapsed");
BeginAggregate();
});
}
private async Task HandleSummaryRequest(int userId, int casinoId, int gsid, string accountNumber, string groupName)
{
try
{
var accountMsg = new AccountMessages.GetAggregatedData(userId, accountNumber, casinoId, gsid);
//AskPattern.AskAsync<AccountMessages.AccountResponseAll>(Context.Self, _accountActor, accountMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_accountActor.Tell(accountMsg);
var contactMsg = new ContactMessages.GetAggregatedContactDetails(userId);
//AskPattern.AskAsync<Messages.ContactMessages.ContactResponse>(Context.Self, _contactActor, contactMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_contactActor.Tell(contactMsg);
var analyticMsg = new AnalyticsMessages.GetAggregatedAnalytics(userId, casinoId, gsid);
//AskPattern.AskAsync<Messages.AnalyticsMessages.AnalyticsResponse>(Context.Self, _analyticsActor, analyticMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_analyticsActor.Tell(analyticMsg);
var financialMsg = new FinancialMessages.GetAggregatedFinancialDetails(userId.ToString());
//AskPattern.AskAsync<Messages.FinancialMessages.FinancialResponse>(Context.Self, _financialActor, financialMsg, _settings.NumberOfMilliSecondsToWaitForResponse, (x) => { return x; });
_financialActor.Tell(financialMsg);
var verificationMsg = VerificationMessages.GetAggregatedVerification.Instance(groupName, casinoId.ToString(), userId.ToString(), gsid);
_verificationActor.Tell(verificationMsg);
var riskMessage = RiskMessages.GeAggregatedRiskDetails.Instance(userId, accountNumber, groupName, casinoId, gsid);
_riskActor.Tell(riskMessage);
_cancelable = Context.System.Scheduler.ScheduleTellOnceCancelable(TimeSpan.FromMilliseconds(_settings.AggregatorTimeOut), Self, Messages.ProfilerMessages.TimeElasped.Instance(), Self);
_cancelRecurring = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(_settings.RecurringResponseCheck, _settings.RecurringResponseCheck, Self, Messages.ProfilerMessages.RecurringCheck.Instance(), Self);
}
catch (Exception ex)
{
ExceptionHandler(ex);
}
}
#endregion
}
如您在示例代码中所见,我正在使用 async/await,并使用 Akka.net 提供的 ReceiveAsync() 方法。
如果我们不能在 actor 中使用 async/await,ReceiveAsync() 的目的是什么?
您可以在 actor 中使用 async/await,但这需要对 suspend/resume actor 的邮箱进行一些必要的编排,直到异步任务完成. 这使得 actor 不可重入,这意味着它不会选择任何新消息,直到当前任务完成。要在 actor 中使用 async/await,您可以:
- 使用可以采用异步处理程序的
ReceiveAsync
。 - 用
ActorTaskScheduler.RunTask
包装您的异步方法调用。这通常在 actor 生命周期方法的上下文中很有用(例如PreStart
/PostStop
)。
请记住,如果使用默认的 actor 消息调度程序,这将有效,但如果 actor 配置为使用不同类型的调度程序,则不能保证有效。
另外,在 actor 内部使用 async/await 也会带来性能下降,这与 suspend/resume 机制和 actor 缺乏可重入性有关。在许多业务案例中,这并不是真正的问题,但有时可能会成为 high-performance/low-latency 工作流程中的问题。