Channel的SingleReader设置为true时是否需要SemaphoreSlim

Is SemaphoreSlim needed when Channel's SingleReader is set to true

当发送数据足够快时,将抛出 InvalidOperationException:'已经有一个未完成的 'SendAsync' 调用此 WebSocket 实例。 ClientWebSocket.ReceiveAsyncClientWebSocket.SendAsync可以同时调用,但最多允许同时调用一个未完成的操作。

第一个示例使用 SemaphoreSlim,它一次只允许一条消息,从而防止了该问题的发生。

问题是我是否需要在第二个示例中执行相同的 SemaphoreSlim 解决方法,因为在频道选项中指定了 SingleReader = true?基本应该是一样的,不过还是希望有人确认一下,所以没有惊喜。

没有System.Threading.Channels

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);

public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
    if (_webSocket.State != WebSocketState.Open)
    {
        return false;
    }
    
    // Only one message can be sent at a time. Wait until a message can be sent.
    await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
    
    // We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
    try
    {
        await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
    }
    catch (TaskCanceledException)
    {
        sendAsyncSemaphore.Release();
        return false;
    }
    
    // Our thread can now release the sendAsyncSemaphore so another message can be sent.
    sendAsyncSemaphore.Release();
    return true;
}

public void Dispose()
{
    sendAsyncSemaphore.Dispose();
}

与System.Threading.Channels

public class WebSocketClient
{
    private readonly WebSocket _webSocket;
    private readonly Channel<string> _input;
    private readonly Channel<string> _output;

    public WebSocketClient(WebSocket webSocket)
    {
        _webSocket = webSocket;

        _input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleWriter = true
        });

        _output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
        {
            SingleReader = true
        });
    }

    private async Task SendLoopAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
        {
            if (_webSocket.State != WebSocketState.Open)
            {
                return;
            }

            var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
            await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
        }
    }
}

如果您设置 SingleReader = true,则不必使用 SemaphoreSlim,因为这只允许来自频道的 true 读者保证一次最多只有一个读取操作.

true readers from the channel guarantee that there will only ever be at most one read operation at a time; if no such constraint is guaranteed.false

顺便说一句,你只定义了 initialCount 但没有限制 maxCount 这意味着你将无限信号计数,这里是 SemaphoreSlim 源代码, NO_MAXIMUM是一个 constint.MaxValue.

// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;


public SemaphoreSlim(int initialCount)
    : this(initialCount, NO_MAXIMUM)
{
}

你可以尝试使用另一种提供2个参数的构造方法,第一个是initialCount第二个是设置maxCount

private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);

简答是!当 SingleReader = true 时,您仍然需要一些东西来限制未完成的读取调用的数量。 documentation 明确指出,您将 属性 设置为真 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'。所以属性不限制发件人。

在我看来,使用信号量或任何其他同步原语都不是解决方法。如果您看一下代码,您会发现设置 SingleReader 选项会得到 SingleConsumerUnboundedChannel。如果您不自己限制读取器,您最终会无意中取消读取操作。