Akka .NET PipeTo() 与 ContinueWith()
Akka .NET PipeTo() with ContinueWith()
The Top 7 Mistakes Newbies Make with Akka.NET 解释了为什么在 actor 中使用 async/await 通常是个坏主意:
[...] we see end users develop lots of nested async / await operations inside an individual message handler. There’s a cost overlooked by most users to doing this: the actor can’t process any other messages between each await operation because those awaits are still part of the “1 message at a time” guarantee for the original message!
然而在 Petabridge Akka .NET 训练营的 Unit 3 Lesson 4 中,这个例子被认为是好的:
// asynchronously download the image and pipe the results to ourself
_httpClient.GetAsync(imageUrl).ContinueWith(httpRequest =>
{
var response = httpRequest.Result;
// successful img download
if (response.StatusCode == HttpStatusCode.OK)
{
var contentStream = response.Content.ReadAsStreamAsync();
try
{
contentStream.Wait(TimeSpan.FromSeconds(1));
return new ImageDownloadResult(image,
response.StatusCode, contentStream.Result);
}
catch //timeout exceptions!
{
return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
}
}
return new ImageDownloadResult(image, response.StatusCode);
},
TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);
按照我的理解,在 GetAsync()
和 ContinueWith()
完成之前,演员将无法处理任何其他消息,这正是 PipeTo()
试图解决的问题避免。
我是不是漏掉了什么?
我认为正在发生的事情是,由于没有等待 GetAsync()
,所以整个过程异步触发并且不会阻止执行。
在所有这些发生的同时,参与者可以自由处理其他消息。
TaskContinuationOptions.ExecuteSynchronously
表示 ContinueWith
将 运行 与转换任务的代码同步完成 (TaskContinuationOptions)。
示例中的代码将触发任务,设置 ContinueWith()
和 PipeTo()
,以及 return。 Actor 可以自由接收新消息,当任务完成时,PipeTo()
将向其发送结果消息。
正如 Gigi 提到的,如果 Task 在 actor 中等待,那么它将被阻塞。
认为 ContinueWith()
也是一项任务可能会有所帮助,这就是 PipeTo()
的作用。
这是 Akka.net Github 中的 PipeTo 扩展:
public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActorRef sender = null, Func<T, object> success = null, Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;
return taskToPipe.ContinueWith(tresult =>
{
if (tresult.IsCanceled || tresult.IsFaulted)
recipient.Tell(failure != null
? failure(tresult.Exception)
: new Status.Failure(tresult.Exception), sender);
else if (tresult.IsCompleted)
recipient.Tell(success != null
? success(tresult.Result)
: tresult.Result, sender);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
The Top 7 Mistakes Newbies Make with Akka.NET 解释了为什么在 actor 中使用 async/await 通常是个坏主意:
[...] we see end users develop lots of nested async / await operations inside an individual message handler. There’s a cost overlooked by most users to doing this: the actor can’t process any other messages between each await operation because those awaits are still part of the “1 message at a time” guarantee for the original message!
然而在 Petabridge Akka .NET 训练营的 Unit 3 Lesson 4 中,这个例子被认为是好的:
// asynchronously download the image and pipe the results to ourself
_httpClient.GetAsync(imageUrl).ContinueWith(httpRequest =>
{
var response = httpRequest.Result;
// successful img download
if (response.StatusCode == HttpStatusCode.OK)
{
var contentStream = response.Content.ReadAsStreamAsync();
try
{
contentStream.Wait(TimeSpan.FromSeconds(1));
return new ImageDownloadResult(image,
response.StatusCode, contentStream.Result);
}
catch //timeout exceptions!
{
return new ImageDownloadResult(image, HttpStatusCode.PartialContent);
}
}
return new ImageDownloadResult(image, response.StatusCode);
},
TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);
按照我的理解,在 GetAsync()
和 ContinueWith()
完成之前,演员将无法处理任何其他消息,这正是 PipeTo()
试图解决的问题避免。
我是不是漏掉了什么?
我认为正在发生的事情是,由于没有等待 GetAsync()
,所以整个过程异步触发并且不会阻止执行。
在所有这些发生的同时,参与者可以自由处理其他消息。
TaskContinuationOptions.ExecuteSynchronously
表示 ContinueWith
将 运行 与转换任务的代码同步完成 (TaskContinuationOptions)。
示例中的代码将触发任务,设置 ContinueWith()
和 PipeTo()
,以及 return。 Actor 可以自由接收新消息,当任务完成时,PipeTo()
将向其发送结果消息。
正如 Gigi 提到的,如果 Task 在 actor 中等待,那么它将被阻塞。
认为 ContinueWith()
也是一项任务可能会有所帮助,这就是 PipeTo()
的作用。
这是 Akka.net Github 中的 PipeTo 扩展:
public static Task PipeTo<T>(this Task<T> taskToPipe, ICanTell recipient, IActorRef sender = null, Func<T, object> success = null, Func<Exception, object> failure = null)
{
sender = sender ?? ActorRefs.NoSender;
return taskToPipe.ContinueWith(tresult =>
{
if (tresult.IsCanceled || tresult.IsFaulted)
recipient.Tell(failure != null
? failure(tresult.Exception)
: new Status.Failure(tresult.Exception), sender);
else if (tresult.IsCompleted)
recipient.Tell(success != null
? success(tresult.Result)
: tresult.Result, sender);
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}