演员内部的异步 API 调用和异常

Async API call inside an actor and exceptions

我知道 PipeTo, but some stuff, like synchronous waiting on nested continuation, seems to go against the async & await way.

所以,我的第一个问题 [1] 是:这里是否有任何 'magic',这样我们就可以同步地等待延续中的嵌套任务并且它最终仍然是异步的?

当我们处于异步和等待差异时,如何处理失败?

让我们创建一个简单的示例:

public static class AsyncOperations
{
    public async static Task<int> CalculateAnswerAsync()
    {
        await Task.Delay(1000).ConfigureAwait(false);
        throw new InvalidOperationException("Testing!");
        //return 42;
    }

    public async static Task<string> ConvertAsync(int number)
    {
        await Task.Delay(600).ConfigureAwait(false);
        return number + " :)";
    }
}

在 'regular' 中,异步和等待方式:

var answer = await AsyncOperations.CalculateAnswerAsync();
var converted = await AsyncOperations.ConvertAsync(answer);

正如您所期望的那样,异常将从第一次操作中冒出来。

现在,让我们创建一个将处理这些异步操作的 actor。为了争论,假设 CalculateAnswerAsyncConvertAsync 应该一个接一个地作为一个完整的操作使用(类似于,例如 StreamWriter.WriteLineAsyncStreamWriter.FlushAsync如果您只想将一行写入流)。

public sealed class AsyncTestActor : ReceiveActor
{
    public sealed class Start
    {
    }

    public sealed class OperationResult
    {
        private readonly string message;

        public OperationResult(string message)
        {
            this.message = message;
        }

        public string Message
        {
            get { return message; }
        }
    }

    public AsyncTestActor()
    {
        Receive<Start>(msg =>
               {
                   AsyncOperations.CalculateAnswerAsync()
                     .ContinueWith(result =>
                            {
                                var number = result.Result;
                                var conversionTask = AsyncOperations.ConvertAsync(number);
                                conversionTask.Wait(1500);
                                return new OperationResult(conversionTask.Result);
                            })
                     .PipeTo(Self);
                });
        Receive<OperationResult>(msg => Console.WriteLine("Got " + msg.Message));
    }
}

如果没有异常,我仍然可以毫无问题地得到 Got 42 :),这让我回到 [1] 上面的 'magic' 点。 此外,示例中提供的 AttachedToParentExecuteSynchronously 标志是可选的,还是非常需要它们才能让一切按预期工作?它们似乎对异常处理没有任何影响...

现在,如果 CalculateAnswerAsync 抛出异常,这意味着 result.Result 抛出 AggregateException,它几乎被吞没了。

如果可能的话,我应该在这里做什么才能使异步操作中的异常像 'regular' 异常一样使 actor 崩溃?

TPL 中错误处理的乐趣 :)

一旦任务在其自己的线程上启动 运行,其内部发生的所有事情都已经与调用者异步 - 包括错误处理

  1. 当您在 actor 中启动您的第一个 Task 时,该任务 运行 独立于您的 actor ThreadPool。这意味着您在 Task 中所做的任何事情都已经与您的 actor 异步 - 因为它 运行 在不同的线程上。这个 is why I made a Task.Wait call inside the PipeTo sample you linked to 在你 post 的顶部。对 actor 没有影响 - 它看起来像是一个长期的 运行ning 任务。
  2. 异常 - 如果您的内部任务失败,conversionTask.Result 属性 将抛出在其 运行 期间捕获的异常,因此您需要在您的内部任务中添加一些错误处理Task 以确保您的演员在出现问题时得到通知。请注意,我在这里就是这样做的:https://github.com/petabridge/akkadotnet-code-samples/blob/master/PipeTo/src/PipeTo.App/Actors/HttpDownloaderActor.cs#L117 - 如果您将异常转化为您的演员可以处理的消息:鸟儿开始歌唱,彩虹闪耀,TPL 错误不再是痛苦和痛苦的根源。
  3. 至于抛出异常时会发生什么...

Now, if the CalculateAnswerAsync throws an exception, which means that result.Result throws AggregateException, it's pretty much swallowed without a trace.

AggregateException 将包含包含在其中的内部异常列表 - TPL 具有这种聚合错误概念的原因是 (a) 你有一个任务是继续多个任务聚合在一起,即 Task.WhenAll 或 (b) 您将错误传播到 ContinueWith 链回到父级。您还可以调用 AggregateException.Flatten() 调用来更轻松地管理嵌套异常。

TPL 最佳实践 + Akka.NET

处理来自 TPL 的异常是一件令人讨厌的事情,这是真的 - 但处理它的最佳方法是 try..catch.. 异常在你的 Task 中并将它们转化为消息 类 你的演员可以处理。

Also, are the AttachedToParent and ExecuteSynchronously flags provided in an example optional, or are they pretty much required to have everything working as intended?

这主要是当您在延续上有延续时的问题 - PipeTo 自动在其自身上使用这些标志。它对错误处理的影响为零,但确保您的延续立即在与原始 Task.

相同的线程上执行

我建议仅在您进行大量嵌套延续时才使用这些标志 - 一旦您深入到 1 个延续之后,TPL 开始对它如何安排您的任务采取一些自由(事实上,像 OnlyOnCompleted 这样的标志超过 1 次继续后停止被接受。)

只是为了补充 Aaron 所说的内容。 从昨天开始,我们确实支持在使用任务调度程序时在 actors 内部进行安全的异步等待。

public class AsyncAwaitActor : ReceiveActor
{
    public AsyncAwaitActor()
    {
        Receive<string>(async m =>
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
            Sender.Tell("done");
        });
    }
}

public class AskerActor : ReceiveActor
{
    public AskerActor(ActorRef other)
    {
        Receive<string>(async m =>
        {
            var res = await other.Ask(m);
            Sender.Tell(res);
        });
    }
}

public class ActorAsyncAwaitSpec : AkkaSpec
{
    [Fact]
    public async Task Actors_should_be_able_to_async_await_ask_message_loop()
    {
        var actor = Sys.ActorOf(Props.Create<AsyncAwaitActor>()
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Worker");
        //IMPORTANT: you must use the akka.actor.task-dispatcher
        //otherwise async await is not safe

        var asker = Sys.ActorOf(Props.Create(() => new AskerActor(actor))
        .WithDispatcher("akka.actor.task-dispatcher"),
            "Asker");

        var res = await asker.Ask("something");
        Assert.Equal("done", res);
    }
}

这不是我们的默认调度程序,因为它的价格在 performance/throughput。 如果触发阻塞的任务(例如使用 task.Wait()task.Result),也存在死锁的风险 所以 PipeTo 模式仍然是首选方法,因为它更符合演员模型。 但是,如果您确实需要进行一些 TPL 集成,异步等待支持可以作为一个额外的工具为您提供。

此功能实际上在幕后使用 PipeTo。 它将接管每个任务并将其包装在一条特殊消息中,并将该消息传递回参与者并在参与者自己的并发上下文中执行该任务。