System.Threading.Channels ReadAsync() 方法正在阻止执行

System.Threading.Channels ReadAsync() method is blocking execution

概览

我正在尝试围绕 IObserver<T> 接口编写一个 IAsyncEnumerable<T> 包装器。起初我使用 BufferBlock<T> 作为后备数据存储,但通过性能测试和研究发现它实际上是一个非常慢的类型,所以我决定尝试一下 System.Threading.Channels.Channel 类型。我的 BufferBlock 实现与这个问题类似,但这次我不确定如何解决它。

问题

如果我的 IObserver<T>.OnNext() 方法尚未写入 _channel,我的 GetAsyncEnumerator() 循环将被 await _channel.Reader.WaitToRead(token) 调用阻塞。在不阻塞程序执行的情况下等待值可用于 yield 的正确方法是什么?

实施

public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>, IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator([EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

其他上下文

没关系,但在运行时传递给构造函数的 IObservable<T> 实例是从对 Microsoft.Management.Infrastructure api 的异步调用返回的 CimAsyncResult。那些利用了我试图用花哨的新异步枚举模式包装的观察者设计模式。

编辑

根据一位评论者的建议,通过记录到调试器输出进行了更新,并使我的 OnNext() 方法 async/await 成为可能。你可以看到它永远不会进入 while() 循环。

Further up the call stack I was calling the async method syncronously via the GetAwaiter().GetResult() methods.

是的,that's a problem

I did this because in once case I wanted to get the data from within a constructor. I changed that implementation to execute the call using Task.Run() and now the iterators run flawlessly with both implementations.

有比阻塞异步代码更好的解决方案。 Using Task.Run is one way to avoid the deadlock,但你最终还是得到了 sub-par 的用户体验(我假设你的是 UI 应用程序,因为有 SynchronizationContext)。

如果使用异步枚举器加载显示数据,那么更合适的解决方案是(synchronously) initialize the UI to a "Loading..." state, and then update that state as the data is loaded asynchronously. If the asynchronous enumerator is used for something else, you may find some appropriate alternative patterns in my async constructors blog post