TaskContinuationOptions.RunContinuationsAsynchronously 和堆栈潜水

TaskContinuationOptions.RunContinuationsAsynchronously and Stack Dives

this blog post 中,Stephan Toub 描述了一个将包含在 .NET 4.6 中的新功能,它向名为 RunContinuationsAsynchronously 的 TaskCreationOptions 和 TaskContinuationOptions 枚举添加了另一个值。

他解释说:

"I talked about a ramification of calling {Try}Set* methods on TaskCompletionSource, that any synchronous continuations off of the TaskCompletionSource’s Task could run synchronously as part of the call. If we were to invoke SetResult here while holding the lock, then synchronous continuations off of that Task would be run while holding the lock, and that could lead to very real problems. So, while holding the lock we grab the TaskCompletionSource to be completed, but we don’t complete it yet, delaying doing so until the lock has been released"

并举如下例子进行演示:

private SemaphoreSlim _gate = new SemaphoreSlim(1, 1);
private async Task WorkAsync()
{
    await _gate.WaitAsync().ConfigureAwait(false);
    try
    {
        // work here
    }
    finally { _gate.Release(); }
}

Now imagine that you have lots of calls to WorkAsync:

await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync());

We've just created 10,000 calls to WorkAsync that will be appropriately serialized on the semaphore. One of the tasks will enter the critical region, and the others will queue up on the WaitAsync call, inside SemaphoreSlim effectively enqueueing the task to be completed when someone calls Release. If Release completed that Task synchronously, then when the first task calls Release, it'll synchronously start executing the second task, and when it calls Release, it'll synchronously start executing the third task, and so on. If the "//work here" section of code above didn't include any awaits that yielded, then we're potentially going to stack dive here and eventually potentially blow out the stack.

我很难理解他所说的同步执行延续的部分。

问题

这怎么可能导致堆栈跳水?更重要的是,为了解决这个问题,RunContinuationsAsynchronously 将有效地做什么?

这里的关键概念是任务的继续可能 运行 在完成先前任务的同一个线程上同步。

让我们假设这是 SemaphoreSlim.Release 的实现(它实际上是 Toub 的 AsyncSemphore 的):

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        toRelease.SetResult(true); 
}

我们可以看到它同步完成了一个任务(使用TaskCompletionSource)。 在这种情况下,如果 WorkAsync 没有其他异步点(即根本没有 await,或者所有 await 都在一个已经完成的任务上)并且调用 _gate.Release() 可能在同一线程上同步完成对 _gate.WaitAsync() 的挂起调用,您可能会达到这样一种状态:单个线程顺序释放信号量,完成下一个挂起调用,执行 // work here 然后再次释放信号量等。等等

这意味着同一个线程在堆栈中越来越深,因此堆栈潜水。

RunContinuationsAsynchronously 确保延续不会 运行 同步,因此释放信号量的线程继续移动并且延续被安排给另一个线程(哪个取决于其他延续参数例如 TaskScheduler)

这在逻辑上类似于将完成张贴到 ThreadPool:

public void Release() 
{ 
    TaskCompletionSource<bool> toRelease = null; 
    lock (m_waiters) 
    { 
        if (m_waiters.Count > 0) 
            toRelease = m_waiters.Dequeue(); 
        else 
            ++m_currentCount; 
    } 
    if (toRelease != null) 
        Task.Run(() => toRelease.SetResult(true)); 
}

How could this possibly cause a stack dive? More so, And what is RunContinuationsAsynchronously effectively going to do in order to solve that problem?

i3arnon 很好地解释了引入 RunContinuationsAsynchronously 背后的原因。我的回答与他的答案相当正交;其实我写这个也是为了自己参考(半年后我自己都不会记得任何细节了:)

首先,让我们看看TaskCompletionSourceRunContinuationsAsynchronously选项与Task.Run(() => tcs.SetResult(result))等选项有何不同。让我们尝试一个简单的控制台应用程序:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplications
{
    class Program
    {
        static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(100, 100);

            Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId });

            var tcs = new TaskCompletionSource<bool>();

            // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously)
            ContinueWith(1, tcs.Task);
            ContinueWith(2, tcs.Task);
            ContinueWith(3, tcs.Task);

            // test await-style continuations
            ContinueAsync(4, tcs.Task);
            ContinueAsync(5, tcs.Task);
            ContinueAsync(6, tcs.Task);

            Task.Run(() =>
            {
                Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId });
                tcs.TrySetResult(true);
                Thread.Sleep(10000);
            });
            Console.ReadLine();
        }

        // log
        static void Continuation(int id)
        {
            Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId });
            Thread.Sleep(1000);
        }

        // await-style continuation
        static async Task ContinueAsync(int id, Task task)
        {
            await task.ConfigureAwait(false);
            Continuation(id);
        }

        // ContinueWith-style continuation
        static Task ContinueWith(int id, Task task)
        {
            return task.ContinueWith(
                t => Continuation(id),
                CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
        }
    }
}

注意所有延续 运行 如何在调用 TrySetResult 的同一个线程上同步:

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 3 }
{ continuation = 5, CurrentManagedThreadId = 3 }
{ continuation = 6, CurrentManagedThreadId = 3 }

现在,如果我们不希望这种情况发生,并且我们希望每个延续 运行 异步进行(即,在没有任何同步上下文的情况下,与其他延续并行并且可能在另一个线程上)?

有一个技巧可以实现 await 风格的延续,通过安装一个假的临时同步上下文(更多细节 here):

public static class TaskExt
{
    class SimpleSynchronizationContext : SynchronizationContext
    {
        internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
    };

    public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
    {
        if (!asyncAwaitContinuations)
        {
            @this.TrySetResult(result);
            return;
        }

        var sc = SynchronizationContext.Current;
        SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
        try
        {
            @this.TrySetResult(result);
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(sc);
        }
    }
}

现在,在我们的测试代码中使用tcs.TrySetResult(true, asyncAwaitContinuations: true)

start, { CurrentManagedThreadId = 1 }
before SetResult, { CurrentManagedThreadId = 3 }
{ continuation = 1, CurrentManagedThreadId = 3 }
{ continuation = 2, CurrentManagedThreadId = 3 }
{ continuation = 3, CurrentManagedThreadId = 3 }
{ continuation = 4, CurrentManagedThreadId = 4 }
{ continuation = 5, CurrentManagedThreadId = 5 }
{ continuation = 6, CurrentManagedThreadId = 6 }

注意await continuations 现在如何运行 并行(尽管,毕竟同步ContinueWith continuations)。

这个 asyncAwaitContinuations: true 逻辑是一个 hack,它只适用于 await 延续。 新的 RunContinuationsAsynchronously 使它对任何类型的延续都能一致地工作,附加到 TaskCompletionSource.Task

RunContinuationsAsynchronously 的另一个不错的方面是任何 await 样式的延续计划在特定的同步上下文中恢复 运行 在该上下文中 异步 (使用 SynchronizationContext.Post,即使 TCS.Task 相同的 上下文中完成(不同于 TCS.SetResult 的当前行为)。ContinueWith-style continuations 也将被它们相应的任务调度程序异步 运行(通常是 TaskScheduler.DefaultTaskScheduler.FromCurrentSynchronizationContext)。它们不会通过 TaskScheduler.TryExecuteTaskInline 内联。我相信 Stephen Toub 已经在他的 blog post, and it also can be seen here in CoreCLR's Task.cs.

的评论中澄清了这一点

我们为什么要担心对所有延续施加异步?

我通常在处理 async 协同执行的方法(协程)时需要它。

一个简单的例子是可暂停的异步处理:一个异步进程pauses/resumes 另一个异步进程的执行。他们的执行工作流在某些 await 点同步,TaskCompletionSource 直接或间接用于这种同步。

下面是一些现成的示例代码,它使用了 Stephen Toub 的 PauseTokenSource 的改编。在这里,一个 async 方法 StartAndControlWorkAsync 开始并定期 pauses/resumes 另一个 async 方法 DoWorkAsync。尝试将 asyncAwaitContinuations: true 更改为 asyncAwaitContinuations: false 并看到逻辑被完全破坏:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static void Main()
        {
            StartAndControlWorkAsync(CancellationToken.None).Wait();
        }

        // Do some work which can be paused/resumed
        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                var step = 0;
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    Console.WriteLine("Working, step: " + step++);
                    await Task.Delay(1000).ConfigureAwait(false);
                    Console.WriteLine("Before await pause.WaitForResumeAsync()");
                    await pause.WaitForResumeAsync();
                    Console.WriteLine("After await pause.WaitForResumeAsync()");
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }

        // Start DoWorkAsync and pause/resume it
        static async Task StartAndControlWorkAsync(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);

            while (true)
            {
                token.ThrowIfCancellationRequested();

                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();

                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + pts.IsPaused);

                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();

                Console.WriteLine("Before resume");
                pts.Resume();
                Console.WriteLine("After resume");
            }
        }

        // Based on Stephen Toub's PauseTokenSource
        // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx
        // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, 
        // the producer-side code has already reached the paused (awaiting) state.
        // E.g. a media player "Pause" button is clicked, gets disabled, playback stops, 
        // and only then "Resume" button gets enabled

        public class PauseTokenSource
        {
            internal static readonly Task s_completedTask = Task.Delay(0);

            readonly object _lock = new Object();

            bool _paused = false;

            TaskCompletionSource<bool> _pauseResponseTcs;
            TaskCompletionSource<bool> _resumeRequestTcs;

            public PauseToken Token { get { return new PauseToken(this); } }

            public bool IsPaused
            {
                get
                {
                    lock (_lock)
                        return _paused;
                }
            }

            // request a resume
            public void Resume()
            {
                TaskCompletionSource<bool> resumeRequestTcs = null;

                lock (_lock)
                {
                    resumeRequestTcs = _resumeRequestTcs;
                    _resumeRequestTcs = null;

                    if (!_paused)
                        return;
                    _paused = false;
                }

                if (resumeRequestTcs != null)
                    resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true);
            }

            // request a pause (completes when paused state confirmed)
            public Task PauseAsync()
            {
                Task responseTask = null;

                lock (_lock)
                {
                    if (_paused)
                        return _pauseResponseTcs.Task;
                    _paused = true;

                    _pauseResponseTcs = new TaskCompletionSource<bool>();
                    responseTask = _pauseResponseTcs.Task;

                    _resumeRequestTcs = null;
                }

                return responseTask;
            }

            // wait for resume request
            internal Task WaitForResumeAsync()
            {
                Task resumeTask = s_completedTask;
                TaskCompletionSource<bool> pauseResponseTcs = null;

                lock (_lock)
                {
                    if (!_paused)
                        return s_completedTask;

                    _resumeRequestTcs = new TaskCompletionSource<bool>();
                    resumeTask = _resumeRequestTcs.Task;

                    pauseResponseTcs = _pauseResponseTcs;

                    _pauseResponseTcs = null;
                }

                if (pauseResponseTcs != null)
                    pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true);

                return resumeTask;
            }
        }

        // consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;

            public PauseToken(PauseTokenSource source) { _source = source; }

            public bool IsPaused { get { return _source != null && _source.IsPaused; } }

            public Task WaitForResumeAsync()
            {
                return IsPaused ?
                    _source.WaitForResumeAsync() :
                    PauseTokenSource.s_completedTask;
            }
        }


    }

    public static class TaskExt
    {
        class SimpleSynchronizationContext : SynchronizationContext
        {
            internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext();
        };

        public static void TrySetResult<TResult>(this TaskCompletionSource<TResult> @this, TResult result, bool asyncAwaitContinuations)
        {
            if (!asyncAwaitContinuations)
            {
                @this.TrySetResult(result);
                return;
            }

            var sc = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance);
            try
            {
                @this.TrySetResult(result);
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(sc);
            }
        }
    }
}

我不想在这里使用 Task.Run(() => tcs.SetResult(result)),因为在 ThreadPool 已经被异步安排到 运行 时将它们推送到 ThreadPool 是多余的102=] 具有适当同步上下文的线程。同时,如果 StartAndControlWorkAsyncDoWorkAsync 运行 在同一个 UI 同步上下文中,我们也会有一个堆栈潜水(如果 tcs.SetResult(result) 是在没有 Task.RunSynchronizationContext.Post 包装的情况下使用。

现在,RunContinuationsAsynchronously 可能是这个问题的最佳解决方案。