如何 运行 捕获的 ExecutionContext 上的异步委托

How to run an async delegate on captured ExecutionContext

正如 Stephen Toub 在 this post 中解释的那样,当您向 ActionBlock 提交消息时,您可以 ExecutionContext.Capture 在调用 ActionBlock.Post 之前,将包含消息和 ExecutionContext 的 DTO 传递到块,然后在消息处理委托内部使用 ExecutionContext.Run 到 运行 捕获上下文中的委托:

public sealed class ContextFlowProcessor<T> {
    private struct MessageState {
        internal ExecutionContext Context;
        internal T Value;
    }

    private readonly ITargetBlock<MessageState> m_block;

    public ContextFlowProcessor(Action<T> action) {
        m_block = new ActionBlock<MessageState>(ms =>
        {
            if (ms.Context != null)
                using (ms.Context) ExecutionContext.Run(ms.Context, s => action((T)s), ms.Value);
            else 
                action(ms.Value);
        });
    }

    public bool Post(T item) {
        var ec = ExecutionContext.Capture();
        var rv = m_block.Post(new MessageState { Context = ec, Value = item });
        if (!rv) ec.Dispose();
        return rv;
    }

    public void Done() { m_block.DeclinePermanently(); }

    public Task CompletionTask { get { return m_block.CompletionTask; } }

当消息处理程序中的逻辑是同步的时,这很有效。但是我如何在捕获的 ExecutionContext 上 运行 一段 async 逻辑?我需要这样的东西:

m_block = new ActionBlock<MessageState>(async ms =>
{
      // omitting the null context situation for brevity
      using (ms.Context)
      {
         await ExecutionContext.Run(ms.Context, async _ => { callSomethingAsync(ms.Value) });
      }
});

显然,这不会编译,因为 ExecutionContext.Run 不支持异步委托(而 ActionBlock 支持)- 那么我该怎么做呢?

如果您能提供一个独立的示例,以便我们尝试重现该问题,我们或许能够提供更好的答案。也就是说,可以使用简单的自定义同步上下文手动控制 ExecutionContext(或者更确切地说,它的副本)跨 await 延续的流程。这是一个示例(警告 - 几乎未经测试!):

// using EcFlowingSynchronizationContext:

m_block = new ActionBlock<MessageState>(async ms =>
{
      using (ms.Context)
      using (var sc = new EcFlowingSynchronizationContext(ms.Context))
      {
         await sc.Run(async _ => { await callSomethingAsync(ms.Value); });
      }
});

// EcFlowingSynchronizationContext: flow execution context manually 

public class EcFlowingSynchronizationContext : SynchronizationContext, IDisposable
{
    private readonly ExecutionContext _ec;
    private readonly TaskScheduler _taskScheduler;

    public EcFlowingSynchronizationContext(ExecutionContext sourceEc) 
    {
        TaskScheduler ts = null;
        ExecutionContext ec = null;

        ExecutionContext.Run(sourceEc, _ =>
        {
            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(this);
            try
            {
                ts = TaskScheduler.FromCurrentSynchronizationContext();
                // this will also capture SynchronizationContext.Current,
                // and it will be flown by subsequent ExecutionContext.Run
                ec = ExecutionContext.Capture();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }, null);

        _ec = ec;
        _taskScheduler = ts;
    }

    private void Execute(SendOrPostCallback d, object state)
    {
        using (var ec = _ec.CreateCopy())
        {
            ExecutionContext.Run(ec, new ContextCallback(d), state);
        }
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public Task<TResult> Run<TResult>(Func<Task<TResult>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, _taskScheduler).Unwrap();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        ThreadPool.UnsafeQueueUserWorkItem(s => Execute(d, s), state);
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        Execute(d, state);
    }

    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public void Dispose()
    {
        _ec.Dispose();
    }
}

请注意,您应该只使用 CallContext.LogicalSetData(或 AsyncLocal<T>)存储不可变值。即,如果您需要存储在从被调用方到调用方的异步流程中可能发生变化的某些内容,并且能够跟踪调用方的变化,请将其设为 属性 或 class 然后存储那个 class 的一个实例。确保 class 也是线程安全的,因为最终你可以有许多原始执行上下文的并发分支。

更多详细信息,请参阅 Stephen Cleary 的优秀文章 Implicit Async Context ("AsyncLocal") and "Eliding Async and Await"