使用 async/await 时是否需要使字段线程安全?

Is it needed to make fields thread-safe when using async/await?

有时我会遇到 async/await 访问对象字段的代码。例如来自无状态项目的 snippet 代码:

private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private bool _firing;

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
    if (_firing)
    {
        _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
        return;
    }

    try
    {
        _firing = true;

        await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

        while (_eventQueue.Count != 0)
        {
            var queuedEvent = _eventQueue.Dequeue();
            await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
        }
    }
    finally
    {
        _firing = false;
    }
}

如果我理解正确,await **.ConfigureAwait(false) 表示在 await 之后执行的代码必须在相同的上下文中执行 not necessarily。所以这里的 while 循环可以在 ThreadPool 线程上执行。我看不出是什么确保 _firing_eventQueue 字段同步,例如,是什么在此处创建了 lock/memory-fence/barrier?所以我的问题是;我需要使字段线程安全,还是 async/await 结构中的某些东西负责这个?

编辑:澄清我的问题;在这种情况下 InternalFireQueuedAsync 应该总是在同一个线程上调用。在那种情况下,只有延续可以 运行 在不同的线程上,这让我想知道,我是否需要同步机制(如显式屏障)来确保值同步以避免此处描述的问题:http://www.albahari.com/threading/part4.aspx

编辑 2:在 stateless 上也有一个小讨论: https://github.com/dotnet-state-machine/stateless/issues/294

为了安全起见,您应该将字段 _firing 标记为 volatile - 这将保证内存屏障并确保延续部分可能 运行 在不同的线程上, 将读取正确的值。如果没有 volatile,编译器、CLR 或 JIT 编译器,甚至 CPU 可能会进行一些优化,导致代码为其读取错误的值。

至于_eventQueue,你没有修改字段,所以标记为volatile是没有用的。如果只有一个线程调用'InternalFireQueuedAsync',你不会同时从多个线程访问它,所以你没问题。

但是,如果多个线程调用 InternalFireQueuedAsync,您将需要使用 ConcurrentQueue,或者锁定对 _eventQueue 的访问。然后,您最好也锁定对 _firing 的访问权限,或使用 Interlocked 访问它,或将其替换为 ManualResetEvent.

ConfigureAwait(false) 表示 Context 未捕获到 运行 延续。使用线程池上下文并不意味着延续 运行 并行。在 while 循环之前和之内使用 await 可确保代码(延续)按顺序 运行 因此在这种情况下无需锁定。 但是,在检查 _firing 值时,您可能会遇到 竞争条件

使用 lockConcurrentQueue

lock 的解决方案:

private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private bool _firing;
private object _eventQueueLock = new object();

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
if (_firing)
{
    lock(_eventQueueLock)
       _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
    return;
}

try
{
    _firing = true;

    await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

    lock(_eventQueueLock)
    while (_eventQueue.Count != 0)
    {
        var queuedEvent = _eventQueue.Dequeue();
        await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
    }
}


finally
{
    _firing = false;
}

}

ConcurrentQueue 的解决方案:

private readonly ConccurentQueue<QueuedTrigger> _eventQueue = new ConccurentQueue<QueuedTrigger>();
private bool _firing;

async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
if (_firing)
{
    _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
    return;
}

try
{
    _firing = true;

    await InternalFireOneAsync(trigger, args).ConfigureAwait(false);

    lock(_eventQueueLock)
    while (_eventQueue.Count != 0)
    {
        object queuedEvent; // change object > expected type
        if(!_eventQueue.TryDequeue())
           continue;
        await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(false);
    }
}


finally
{
    _firing = false;
}

}

I don't see what is making sure that the _firing and _eventQueue fields are synchronized, for example what is creating the a lock/memory-fence/barrier here? So my question is; do I need to make the fields thread-safe, or is something in the async/await structure taking care of this?

await 将确保所有必要的内存屏障都到位。然而,这并不能使他们 "thread-safe".

in this case InternalFireQueuedAsync should always be called on the same thread.

那么_firing就可以了,不需要volatile之类的。

但是,_eventQueue的用法是不正确的。考虑一个线程池线程恢复了await之后的代码会发生什么:完全有可能Queue<T>.Count或者Queue<T>.Dequeue()同时被一个线程池线程调用Queue<T>.Enqueue 被主线程调用。这不是线程安全的。

如果调用 InternalFireQueuedAsync 的主线程是具有单线程上下文的线程(例如 UI 线程),那么一个简单的解决方法是删除 [=19] 的所有实例=] 在这个方法中。