将 SendAsync 调用限制为每秒 5 条消息
Restricting SendAsync calls to 5 messages per second
我正在实施 Binance 的 API。
WebSocket connections have a limit of 5 incoming messages per second. A message is considered:
- A PING frame
- A PONG frame
- A JSON controlled message (e.g. subscribe, unsubscribe)
例如。有一个简单的网络套接字包装器,例如来自官方 Binance Connector 的包装器。根据上面的限制,SendAsync
应该被限制为每秒5条消息。如果几个线程同时调用 SendAsync 5 次(包括 ClientWebSocket class 内置的 PING 帧),它将失败。我怎样才能优雅地解决这个限制的问题?使用有界通道是一种解决方案吗?
public class BinanceWebSocket : IDisposable
{
private IBinanceWebSocketHandler handler;
private List<Func<string, Task>> onMessageReceivedFunctions;
private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
private CancellationTokenSource loopCancellationTokenSource;
private Uri url;
private int receiveBufferSize;
public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
{
this.handler = handler;
this.url = new Uri(url);
this.receiveBufferSize = receiveBufferSize;
this.onMessageReceivedFunctions = new List<Func<string, Task>>();
this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
}
public async Task ConnectAsync(CancellationToken cancellationToken)
{
if (this.handler.State != WebSocketState.Open)
{
this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
await this.handler.ConnectAsync(this.url, cancellationToken);
await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
public async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (this.loopCancellationTokenSource != null)
{
this.loopCancellationTokenSource.Cancel();
}
if (this.handler.State == WebSocketState.Open)
{
await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
}
public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
{
this.onMessageReceivedFunctions.Add(onMessageReceived);
if (cancellationToken != CancellationToken.None)
{
var reg = cancellationToken.Register(() =>
this.onMessageReceivedFunctions.Remove(onMessageReceived));
this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
}
}
public async Task SendAsync(string message, CancellationToken cancellationToken)
{
byte[] byteArray = Encoding.ASCII.GetBytes(message);
await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
}
public void Dispose()
{
this.DisconnectAsync(CancellationToken.None).Wait();
this.handler.Dispose();
this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());
this.loopCancellationTokenSource.Dispose();
}
private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
{
WebSocketReceiveResult receiveResult = null;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
break;
}
string content = Encoding.UTF8.GetString(buffer.ToArray());
this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
}
}
catch (TaskCanceledException)
{
await this.DisconnectAsync(CancellationToken.None);
}
}
}
第二种方法,我不是 100% 确定它能解决问题
SendAsync
正在使用通道在循环中调用。 SingleReader
设置为true,表示一次只有一个消费者。它在技术上应该可以解决问题,但我不是 100% 确定,因为通道可能只限制缓冲区中的数量。
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
public ValueTask SendAsync(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
return _messagesTextToSendQueue.Writer.WriteAsync(message);
}
public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesTextToSendQueue.Writer.TryWrite(message);
}
private async Task SendTextFromQueue()
{
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesTextToSendQueue.Reader.TryRead(out var message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.Error(e, L($"Failed to send text message: '{message}'. Error: {e.Message}"));
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Logger.Trace(L($"Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
StartBackgroundThreadForSendingText();
}
}
我会尽量保持简单并使用 Semaphore Slim 来实现这一点,我创建了一个 class 来执行此任务。
public class ThrottlingLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public ThrottlingLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
await Task.Delay(_timeUnit).ConfigureAwait(false);
_semaphore.Release();
}
}
现在要使用这个 class,您所要做的就是设置您的限制和时间跨度
public async Task SendData(List<string> allMessages)
{
// Limiting 5 calls per second
ThrottlingLimiter throttlingLimiter = new ThrottlingLimiter(5, TimeSpan.FromSeconds(1));
await Task.WhenAll(allMessages.Select(async message =>
{
await throttlingLimiter.WaitAsync();
try {
await SendInternalSynchronized(message);
// I am not sure what this SendInternalSynchronized returns but I would return some thing to keep a track if this call is successful or not
}
catch (Exception e)
{
Logger.Error(e, L($"Failed to send text message: {message}'. Error: {e.Message}"));
}
});
}
所以这里基本上会发生什么,无论您的列表有多大,ThrottlingLimiter 每秒只会发送 5 条消息,并等待下一秒发送下 5 条消息。
所以,在你的情况下,从你对
的调用中获取所有数据
await _messagesTextToSendQueue.Reader.WaitToReadAsync();
将其存储到列表或任何集合中并将其传递给 SendData 函数。
我正在实施 Binance 的 API。
WebSocket connections have a limit of 5 incoming messages per second. A message is considered:
- A PING frame
- A PONG frame
- A JSON controlled message (e.g. subscribe, unsubscribe)
例如。有一个简单的网络套接字包装器,例如来自官方 Binance Connector 的包装器。根据上面的限制,SendAsync
应该被限制为每秒5条消息。如果几个线程同时调用 SendAsync 5 次(包括 ClientWebSocket class 内置的 PING 帧),它将失败。我怎样才能优雅地解决这个限制的问题?使用有界通道是一种解决方案吗?
public class BinanceWebSocket : IDisposable
{
private IBinanceWebSocketHandler handler;
private List<Func<string, Task>> onMessageReceivedFunctions;
private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
private CancellationTokenSource loopCancellationTokenSource;
private Uri url;
private int receiveBufferSize;
public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
{
this.handler = handler;
this.url = new Uri(url);
this.receiveBufferSize = receiveBufferSize;
this.onMessageReceivedFunctions = new List<Func<string, Task>>();
this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
}
public async Task ConnectAsync(CancellationToken cancellationToken)
{
if (this.handler.State != WebSocketState.Open)
{
this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
await this.handler.ConnectAsync(this.url, cancellationToken);
await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
public async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (this.loopCancellationTokenSource != null)
{
this.loopCancellationTokenSource.Cancel();
}
if (this.handler.State == WebSocketState.Open)
{
await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
}
public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
{
this.onMessageReceivedFunctions.Add(onMessageReceived);
if (cancellationToken != CancellationToken.None)
{
var reg = cancellationToken.Register(() =>
this.onMessageReceivedFunctions.Remove(onMessageReceived));
this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
}
}
public async Task SendAsync(string message, CancellationToken cancellationToken)
{
byte[] byteArray = Encoding.ASCII.GetBytes(message);
await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
}
public void Dispose()
{
this.DisconnectAsync(CancellationToken.None).Wait();
this.handler.Dispose();
this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());
this.loopCancellationTokenSource.Dispose();
}
private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
{
WebSocketReceiveResult receiveResult = null;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
break;
}
string content = Encoding.UTF8.GetString(buffer.ToArray());
this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
}
}
catch (TaskCanceledException)
{
await this.DisconnectAsync(CancellationToken.None);
}
}
}
第二种方法,我不是 100% 确定它能解决问题
SendAsync
正在使用通道在循环中调用。 SingleReader
设置为true,表示一次只有一个消费者。它在技术上应该可以解决问题,但我不是 100% 确定,因为通道可能只限制缓冲区中的数量。
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
public ValueTask SendAsync(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
return _messagesTextToSendQueue.Writer.WriteAsync(message);
}
public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesTextToSendQueue.Writer.TryWrite(message);
}
private async Task SendTextFromQueue()
{
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesTextToSendQueue.Reader.TryRead(out var message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.Error(e, L($"Failed to send text message: '{message}'. Error: {e.Message}"));
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Logger.Trace(L($"Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
StartBackgroundThreadForSendingText();
}
}
我会尽量保持简单并使用 Semaphore Slim 来实现这一点,我创建了一个 class 来执行此任务。
public class ThrottlingLimiter
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public ThrottlingLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
{
if (maxActionsPerTimeUnit < 1)
throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > int.MaxValue)
throw new ArgumentOutOfRangeException(nameof(timeUnit));
_semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
_timeUnit = timeUnit;
}
public async Task WaitAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
ScheduleSemaphoreRelease();
}
private async void ScheduleSemaphoreRelease()
{
await Task.Delay(_timeUnit).ConfigureAwait(false);
_semaphore.Release();
}
}
现在要使用这个 class,您所要做的就是设置您的限制和时间跨度
public async Task SendData(List<string> allMessages)
{
// Limiting 5 calls per second
ThrottlingLimiter throttlingLimiter = new ThrottlingLimiter(5, TimeSpan.FromSeconds(1));
await Task.WhenAll(allMessages.Select(async message =>
{
await throttlingLimiter.WaitAsync();
try {
await SendInternalSynchronized(message);
// I am not sure what this SendInternalSynchronized returns but I would return some thing to keep a track if this call is successful or not
}
catch (Exception e)
{
Logger.Error(e, L($"Failed to send text message: {message}'. Error: {e.Message}"));
}
});
}
所以这里基本上会发生什么,无论您的列表有多大,ThrottlingLimiter 每秒只会发送 5 条消息,并等待下一秒发送下 5 条消息。
所以,在你的情况下,从你对
的调用中获取所有数据 await _messagesTextToSendQueue.Reader.WaitToReadAsync();
将其存储到列表或任何集合中并将其传递给 SendData 函数。