混合 ExecutionContext.SuppressFlow 和任务时 AsyncLocal.Value 的意外值

Unexpected values for AsyncLocal.Value when mixing ExecutionContext.SuppressFlow and tasks

在一个应用程序中,由于 AsyncLocal 的 wrong/unexpected 值,我遇到了奇怪的行为:尽管我抑制了执行上下文的流动,但 AsyncLocal.Value-属性 有时不是在新生成的任务的执行范围内重置。

下面我创建了一个最小的可重现示例来演示问题:

private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                anotherTask = Task.Run(() =>
                    {
                        Trace.WriteLine(AsyncLocal.Value); // "1" <- ???
                        Assert.IsNull(AsyncLocal.Value); // BOOM - FAILS
                        AsyncLocal.Value = "2";
                    });
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

十分之九的 运行 秒(在我的电脑上)测试方法的结果是:

.NET 6.0.2
"1"

-> 测试失败

如您所见,测试失败是因为在 Task.Run 中执行的操作中,先前的值仍然存在于 AsyncLocal.Value 中(消息:1)。

我的具体问题是:

  1. 为什么会发生这种情况? 我 怀疑 发生这种情况是因为 Task.Run 可能使用当前线程来执行工作负载。在那种情况下,我假设缺少 async/await-operators 不会强制为操作创建 new/separate ExecutionContext。正如 Stephen Cleary 所说“from the logical call context’s perspective, all synchronous invocations are “collapsed” - they’re actually part of the context of the closest async method further up the call stack"。如果是这样的话,我确实理解为什么在动作中使用相同的上下文。

这是对此行为的正确解释吗?此外,为什么它有时会完美运行(在我的机器上大约有 1 运行 out of 10)?

  1. 如何解决这个问题? 假设我上面的理论是正确的,强制引入一个新的异步“层”就足够了,如下所示:
private static readonly AsyncLocal<object> AsyncLocal = new AsyncLocal<object>();
[TestMethod]
public void Test()
{
    Trace.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
        {
            AsyncLocal.Value = "1";
            Task anotherTask;
            using (ExecutionContext.SuppressFlow())
            {
                var wrapper = () =>
                    {
                        Trace.WriteLine(AsyncLocal.Value);
                        Assert.IsNull(AsyncLocal.Value); 
                        AsyncLocal.Value = "2";
                        return Task.CompletedTask;
                    };

                anotherTask = Task.Run(async () => await wrapper());
            }

            Task.WaitAll(anotherTask);
        });

    mainTask.Wait(500000, CancellationToken.None);
}

这似乎解决了问题(它在我的机器上始终有效),但我想确保这是对这个问题的正确修复。

非常感谢

如果可以,我会首先考虑是否可以用单个共享缓存替换 thread-specific 缓存。该应用程序可能早于有用的类型,例如 ConcurrentDictionary.

如果无法使用单例缓存,则可以使用 堆栈 异步本地值。堆叠异步本地值是一种常见的模式。我更喜欢将堆栈逻辑包装成一个单独的类型(下面代码中的AsyncLocalValue):

public sealed class AsyncLocalValue
{
    private static readonly AsyncLocal<ImmutableStack<object>> _asyncLocal = new();

    public object Value => _asyncLocal.Value?.Peek();

    public IDisposable PushValue(object value)
    {
        var originalValue = _asyncLocal.Value;
        var newValue = (originalValue ?? ImmutableStack<object>.Empty).Push(value);
        _asyncLocal.Value = newValue;
        return Disposable.Create(() => _asyncLocal.Value = originalValue);
    }
}

private static AsyncLocalValue AsyncLocal = new();

[TestMethod]
public void Test()
{
    Console.WriteLine(System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription);
    var mainTask = Task.Factory.StartNew(() =>
    {
        Task anotherTask;
        using (AsyncLocal.PushValue("1"))
        {
            using (AsyncLocal.PushValue(null))
            {
                anotherTask = Task.Run(() =>
                {
                    Console.WriteLine("Observed: " + AsyncLocal.Value);
                    using (AsyncLocal.PushValue("2"))
                    {
                    }
                });
            }
        }

        Task.WaitAll(anotherTask);
    });

    mainTask.Wait(500000, CancellationToken.None);
}

此代码示例使用我 Nito.Disposables library 中的 Disposable.Create

Why does this happen? I suspect this happens because Task.Run may use the current thread to execute the work load.

我怀疑是因为Task.WaitAll会使用当前线程内联执行任务。

具体来说,Task.WaitAll 调用 Task.WaitAllCore, which will attempt to run it inline by calling Task.WrappedTryRunInline. I'm going to assume the default task scheduler is used throughout. In that case, this will invoke TaskScheduler.TryRunInline, which will return false if the delegate is already invoked。因此,如果任务 已经在线程池线程上 运行ning 启动,这将 return 返回到 WaitAllCore,这将只执行一个正常等待,您的代码将按预期工作(十分之一)。

如果一个线程池线程还没有拿起它(10 个中有 9 个),那么 TaskScheduler.TryRunInline 将调用 TaskScheduler.TryExecuteTaskInline, the default implementation of which will call Task.ExecuteEntryUnsafe, which calls Task.ExecuteWithThreadLocal. Task.ExecuteWithThreadLocal has logic for applying an ExecutionContext if one was captured. Assuming none was captured, the task's delegate is just invoked directly

所以,似乎每一步都符合逻辑。从技术上讲,ExecutionContext.SuppressFlow 的意思是“不要捕获 ExecutionContext”,这就是正在发生的事情。它并不意味着“清除 ExecutionContext”。有时任务在线程池线程上 运行(没有捕获的 ExecutionContext),而 WaitAll 只会等待它完成。其他时候任务将由 WaitAll 而不是线程池线程内联执行,在这种情况下 ExecutionContext 不会被清除(技术上也不会被捕获)。

您可以通过捕获 wrapper 中的当前线程 ID 并将其与执行 Task.WaitAll 的线程 ID 进行比较来测试该理论。我希望它们对于异步本地值(意外地)继承的 运行s 是同一个线程,而对于异步本地值按预期工作的 运行s 它们将是不同的线程.