Channel的SingleReader设置为true时是否需要SemaphoreSlim
Is SemaphoreSlim needed when Channel's SingleReader is set to true
当发送数据足够快时,将抛出 InvalidOperationException:'已经有一个未完成的 'SendAsync' 调用此 WebSocket 实例。 ClientWebSocket.ReceiveAsync
和ClientWebSocket.SendAsync
可以同时调用,但最多允许同时调用一个未完成的操作。
第一个示例使用 SemaphoreSlim,它一次只允许一条消息,从而防止了该问题的发生。
问题是我是否需要在第二个示例中执行相同的 SemaphoreSlim
解决方法,因为在频道选项中指定了 SingleReader = true
?基本应该是一样的,不过还是希望有人确认一下,所以没有惊喜。
没有System.Threading.Channels
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);
public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
if (_webSocket.State != WebSocketState.Open)
{
return false;
}
// Only one message can be sent at a time. Wait until a message can be sent.
await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
// We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
try
{
await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
}
catch (TaskCanceledException)
{
sendAsyncSemaphore.Release();
return false;
}
// Our thread can now release the sendAsyncSemaphore so another message can be sent.
sendAsyncSemaphore.Release();
return true;
}
public void Dispose()
{
sendAsyncSemaphore.Dispose();
}
与System.Threading.Channels
public class WebSocketClient
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public WebSocketClient(WebSocket webSocket)
{
_webSocket = webSocket;
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
if (_webSocket.State != WebSocketState.Open)
{
return;
}
var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
}
}
如果您设置 SingleReader = true
,则不必使用 SemaphoreSlim
,因为这只允许来自频道的 true
读者保证一次最多只有一个读取操作.
true
readers from the channel guarantee that there will only ever be at most one read operation at a time; if no such constraint is guaranteed.false
顺便说一句,你只定义了 initialCount
但没有限制 maxCount
这意味着你将无限信号计数,这里是 SemaphoreSlim 源代码, NO_MAXIMUM
是一个 const
值 int.MaxValue
.
// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;
public SemaphoreSlim(int initialCount)
: this(initialCount, NO_MAXIMUM)
{
}
你可以尝试使用另一种提供2个参数的构造方法,第一个是initialCount
第二个是设置maxCount
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);
简答是!当 SingleReader = true
时,您仍然需要一些东西来限制未完成的读取调用的数量。 documentation 明确指出,您将 属性 设置为真 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'。所以属性不限制发件人。
在我看来,使用信号量或任何其他同步原语都不是解决方法。如果您看一下代码,您会发现设置 SingleReader
选项会得到 SingleConsumerUnboundedChannel
。如果您不自己限制读取器,您最终会无意中取消读取操作。
当发送数据足够快时,将抛出 InvalidOperationException:'已经有一个未完成的 'SendAsync' 调用此 WebSocket 实例。 ClientWebSocket.ReceiveAsync
和ClientWebSocket.SendAsync
可以同时调用,但最多允许同时调用一个未完成的操作。
第一个示例使用 SemaphoreSlim,它一次只允许一条消息,从而防止了该问题的发生。
问题是我是否需要在第二个示例中执行相同的 SemaphoreSlim
解决方法,因为在频道选项中指定了 SingleReader = true
?基本应该是一样的,不过还是希望有人确认一下,所以没有惊喜。
没有System.Threading.Channels
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1);
public async Task<bool> SendMessageAsync(ReadOnlyMemory<byte> message)
{
if (_webSocket.State != WebSocketState.Open)
{
return false;
}
// Only one message can be sent at a time. Wait until a message can be sent.
await sendAsyncSemaphore.WaitAsync().ConfigureAwait(false);
// We can now send a message. Our thread has entered sendAsyncSemaphore, so we should be the one to release it.
try
{
await _Client.SendAsync(message, WebSocketMessageType.Text, true, connectionCancellation.Token).ConfigureAwait(true);
}
catch (TaskCanceledException)
{
sendAsyncSemaphore.Release();
return false;
}
// Our thread can now release the sendAsyncSemaphore so another message can be sent.
sendAsyncSemaphore.Release();
return true;
}
public void Dispose()
{
sendAsyncSemaphore.Dispose();
}
与System.Threading.Channels
public class WebSocketClient
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public WebSocketClient(WebSocket webSocket)
{
_webSocket = webSocket;
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
if (_webSocket.State != WebSocketState.Open)
{
return;
}
var bytesToSend = new ArraySegment<byte>(Encoding.UTF8.GetBytes(message));
await _webSocket.SendAsync(bytesToSend, WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false);
}
}
}
如果您设置 SingleReader = true
,则不必使用 SemaphoreSlim
,因为这只允许来自频道的 true
读者保证一次最多只有一个读取操作.
true
readers from the channel guarantee that there will only ever be at most one read operation at a time; if no such constraint is guaranteed.false
顺便说一句,你只定义了 initialCount
但没有限制 maxCount
这意味着你将无限信号计数,这里是 SemaphoreSlim 源代码, NO_MAXIMUM
是一个 const
值 int.MaxValue
.
// No maximum constant
private const int NO_MAXIMUM = int.MaxValue;
public SemaphoreSlim(int initialCount)
: this(initialCount, NO_MAXIMUM)
{
}
你可以尝试使用另一种提供2个参数的构造方法,第一个是initialCount
第二个是设置maxCount
private readonly SemaphoreSlim sendAsyncSemaphore = new SemaphoreSlim(1,1);
简答是!当 SingleReader = true
时,您仍然需要一些东西来限制未完成的读取调用的数量。 documentation 明确指出,您将 属性 设置为真 'if readers to the channel guarantee that there will only ever be at most one read operation at a time'。所以属性不限制发件人。
在我看来,使用信号量或任何其他同步原语都不是解决方法。如果您看一下代码,您会发现设置 SingleReader
选项会得到 SingleConsumerUnboundedChannel
。如果您不自己限制读取器,您最终会无意中取消读取操作。