SocketAsyncEventArgs "pooled byte[]" 样式是否有助于减少内存需求?
Does the SocketAsyncEventArgs "pooled byte[]" style help reduce memory requirements?
我看不出 pooled SocketAsyncEventArgs 样式如何帮助我减少为许多并发连接提供服务的服务器的内存消耗。
是的,它提供了 MS 的 Begin/End 样式的替代方案,上述 MSDN 页面描述为需要 a System.IAsyncResult object be allocated for each asynchronous socket operation
。
最初的研究让我相信出于某种原因它最多只能分配少量字节数组并在我的数千个并发连接的客户端之间共享它们。
但似乎如果我想在数千个客户端连接上等待数据,我必须调用 ReceiveAsync
数千次,每次都提供不同的字节数组(包装在 SocketAsyncEventArgs 中),并且然后,那数千个数组将一直坐在那里,直到客户端决定发送为止,这很可能是 10 秒。
因此,除非我在客户端发送数据时调用 ReceiveAsync(或者在那之后,依赖于一些网络堆栈缓冲区?)——这由客户端自行决定并且服务器无法预测,否则我运气不好字节数组将坐在那里,无所事事地等待客户移动他的底部。
我希望用一个字节数组(或者每个监听线程一个数组,如果并行化有意义的话)来监听那数千个连接,一旦这些连接中的任何一个发送了一些东西(这确实必须得到无论如何首先进入一些网络堆栈缓冲区),它将被复制到那个数组中,我的监听器被调用,一旦监听器完成,数组就可以重用。
Socket.*Async() 方法确实不可能做到这一点吗?
.net 的套接字库完全可以实现这样的功能吗?
不可能为多个套接字操作共享同一内存(或者如果您收到未定义的结果)。
一开始只读1个字节就可以规避这个问题。当读取完成时,很可能会有更多数据到来。因此,对于下一次读取,您使用更有效的大小,例如 4KB(或者您询问 DataAvailable
属性 - 这是 属性 的唯一有效用例)。
MSDN 文章解释了池的工作原理。本质上:
a) 如果有可用的池实例则使用它,否则创建一个新实例。
b) 完成后 return 将实例放入池中以便重复使用。
最终池大小会增加以容纳所有请求,或者您可以将池配置为具有最大实例计数并在有实例请求时阻塞,已达到最大池大小,并且游泳池目前是空的。此策略可防止池以不受控制的方式增长。
这是一个实现的草图,结合了我们的伟大 byte[1]
解决方法建议,并展示了如何完全隐藏有些麻烦的 Socket.xxxAsync
方法在不牺牲性能的情况下 SimpleAsyncSocket
。
使用 SimpleAsyncSocket
的简单异步回显服务器可能如下所示。
readonly static Encoding Enc = new UTF8Encoding(false);
SimpleAsyncSocket _simpleSocket;
void StartEchoServer(Socket socket)
{
_simpleSocket = new SimpleAsyncSocket(socket, OnSendCallback,
_receiveBufferPool, OnReceiveCallback);
}
bool OnReceiveCallback(SimpleAsyncSocket socket,
ArraySegment<byte> bytes)
{
var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
_simpleSocket.SendAsync(new ArraySegment<byte>(Enc.GetBytes(str)));
return false;
}
void OnSendCallback(SimpleAsyncSocket asyncSocket,
ICollection<ArraySegment<byte>> collection, SocketError arg3)
{
var bytes = collection.First();
var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
}
下面是实现的草图:
class SimpleAsyncSocket
{
private readonly Socket _socket;
private readonly Pool<byte[]> _receiveBufferPool;
private readonly SocketAsyncEventArgs _recvAsyncEventArgs;
private readonly SocketAsyncEventArgs _sendAsyncEventArgs;
private readonly byte[] _waitForReceiveEventBuffer = new byte[1];
private readonly Queue<ArraySegment<byte>> _sendBuffers = new Queue<ArraySegment<byte>>();
public SimpleAsyncSocket(Socket socket, Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError> sendCallback,
Pool<byte[]> receiveBufferPool, Func<SimpleAsyncSocket, ArraySegment<byte>, bool> receiveCallback)
{
if (socket == null) throw new ArgumentNullException("socket");
if (sendCallback == null) throw new ArgumentNullException("sendCallback");
if (receiveBufferPool == null) throw new ArgumentNullException("receiveBufferPool");
if (receiveCallback == null) throw new ArgumentNullException("receiveCallback");
_socket = socket;
_sendAsyncEventArgs = new SocketAsyncEventArgs();
_sendAsyncEventArgs.UserToken = sendCallback;
_sendAsyncEventArgs.Completed += SendCompleted;
_receiveBufferPool = receiveBufferPool;
_recvAsyncEventArgs = new SocketAsyncEventArgs();
_recvAsyncEventArgs.UserToken = receiveCallback;
_recvAsyncEventArgs.Completed += ReceiveCompleted;
_recvAsyncEventArgs.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
ReceiveAsyncWithoutTheHassle(_recvAsyncEventArgs);
}
public void SendAsync(ArraySegment<byte> buffer)
{
lock (_sendBuffers)
_sendBuffers.Enqueue(buffer);
StartOrContinueSending();
}
private void StartOrContinueSending(bool calledFromCompleted = false)
{
lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
{
if (!calledFromCompleted && _sendAsyncEventArgs.BufferList != null)
return; // still sending
List<ArraySegment<byte>> buffers = null;
lock (_sendBuffers)
{
if (_sendBuffers.Count > 0)
{
buffers = new List<ArraySegment<byte>>(_sendBuffers);
_sendBuffers.Clear();
}
}
_sendAsyncEventArgs.BufferList = buffers; // nothing left to send
if (buffers == null)
return;
}
if (!_socket.SendAsync(_sendAsyncEventArgs))
// Someone on Whosebug claimed that invoking the Completed
// handler synchronously might end up blowing the stack, which
// does sound possible. To avoid that guy finding my code and
// downvoting me for it (and maybe just because it's the right
// thing to do), let's leave the call stack via the ThreadPool
ThreadPool.QueueUserWorkItem(state => SendCompleted(this, _sendAsyncEventArgs));
}
private void SendCompleted(object sender, SocketAsyncEventArgs args)
{
switch (args.LastOperation)
{
case SocketAsyncOperation.Send:
{
try
{
var bytesTransferred = args.BytesTransferred;
var sendCallback = (Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError>)args.UserToken;
// for the moment, I believe the following commented-out lock is not
// necessary, but still have to think it through properly
// lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
{
sendCallback(this, args.BufferList, args.SocketError);
}
StartOrContinueSending(true);
}
catch (Exception e)
{
args.BufferList = null;
// todo: log and disconnect
}
break;
}
case SocketAsyncOperation.None:
break;
default:
throw new Exception("Unsupported operation: " + args.LastOperation);
}
}
private void ReceiveCompleted(object sender, SocketAsyncEventArgs args)
{
switch (args.LastOperation)
{
case SocketAsyncOperation.Receive:
{
var bytesTransferred = args.BytesTransferred;
var buffer = args.Buffer;
if (args.BytesTransferred == 0) // remote end closed connection
{
args.SetBuffer(null, 0, 0);
if (buffer != _waitForReceiveEventBuffer)
_receiveBufferPool.Return(buffer);
// todo: disconnect event
return;
}
if (buffer == _waitForReceiveEventBuffer)
{
if (args.BytesTransferred == 1)
{
// we received one byte, there's probably more!
var biggerBuffer = _receiveBufferPool.Take();
biggerBuffer[0] = _waitForReceiveEventBuffer[0];
args.SetBuffer(biggerBuffer, 1, biggerBuffer.Length - 1);
ReceiveAsyncWithoutTheHassle(args);
}
else
throw new Exception("What the heck");
}
else
{
var callback = (Func<SimpleAsyncSocket, ArraySegment<byte>, bool>)args.UserToken;
bool calleeExpectsMoreDataImmediately = false;
bool continueReceiving = false;
try
{
var count = args.Offset == 1
// we set the first byte manually from _waitForReceiveEventBuffer
? bytesTransferred + 1
: bytesTransferred;
calleeExpectsMoreDataImmediately = callback(this, new ArraySegment<byte>(buffer, 0, count));
continueReceiving = true;
}
catch (Exception e)
{
// todo: log and disconnect
}
finally
{
if (!calleeExpectsMoreDataImmediately)
{
args.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
_receiveBufferPool.Return(buffer);
}
}
if (continueReceiving)
ReceiveAsyncWithoutTheHassle(args);
}
break;
}
case SocketAsyncOperation.None:
break;
default:
throw new Exception("Unsupported operation: " + args.LastOperation);
}
}
private void ReceiveAsyncWithoutTheHassle(SocketAsyncEventArgs args)
{
if (!_socket.ReceiveAsync(args))
// Someone on Whosebug claimed that invoking the Completed
// handler synchronously might end up blowing the stack, which
// does sound possible. To avoid that guy finding my code and
// downvoting me for it (and maybe just because it's the right
// thing to do), let's leave the call stack via the ThreadPool
ThreadPool.QueueUserWorkItem(state => ReceiveCompleted(this, args));
}
}
我看不出 pooled SocketAsyncEventArgs 样式如何帮助我减少为许多并发连接提供服务的服务器的内存消耗。
是的,它提供了 MS 的 Begin/End 样式的替代方案,上述 MSDN 页面描述为需要 a System.IAsyncResult object be allocated for each asynchronous socket operation
。
最初的研究让我相信出于某种原因它最多只能分配少量字节数组并在我的数千个并发连接的客户端之间共享它们。
但似乎如果我想在数千个客户端连接上等待数据,我必须调用 ReceiveAsync
数千次,每次都提供不同的字节数组(包装在 SocketAsyncEventArgs 中),并且然后,那数千个数组将一直坐在那里,直到客户端决定发送为止,这很可能是 10 秒。
因此,除非我在客户端发送数据时调用 ReceiveAsync(或者在那之后,依赖于一些网络堆栈缓冲区?)——这由客户端自行决定并且服务器无法预测,否则我运气不好字节数组将坐在那里,无所事事地等待客户移动他的底部。
我希望用一个字节数组(或者每个监听线程一个数组,如果并行化有意义的话)来监听那数千个连接,一旦这些连接中的任何一个发送了一些东西(这确实必须得到无论如何首先进入一些网络堆栈缓冲区),它将被复制到那个数组中,我的监听器被调用,一旦监听器完成,数组就可以重用。
Socket.*Async() 方法确实不可能做到这一点吗?
.net 的套接字库完全可以实现这样的功能吗?
不可能为多个套接字操作共享同一内存(或者如果您收到未定义的结果)。
一开始只读1个字节就可以规避这个问题。当读取完成时,很可能会有更多数据到来。因此,对于下一次读取,您使用更有效的大小,例如 4KB(或者您询问 DataAvailable
属性 - 这是 属性 的唯一有效用例)。
MSDN 文章解释了池的工作原理。本质上:
a) 如果有可用的池实例则使用它,否则创建一个新实例。
b) 完成后 return 将实例放入池中以便重复使用。
最终池大小会增加以容纳所有请求,或者您可以将池配置为具有最大实例计数并在有实例请求时阻塞,已达到最大池大小,并且游泳池目前是空的。此策略可防止池以不受控制的方式增长。
这是一个实现的草图,结合了我们的伟大 byte[1]
解决方法建议,并展示了如何完全隐藏有些麻烦的 Socket.xxxAsync
方法在不牺牲性能的情况下 SimpleAsyncSocket
。
使用 SimpleAsyncSocket
的简单异步回显服务器可能如下所示。
readonly static Encoding Enc = new UTF8Encoding(false);
SimpleAsyncSocket _simpleSocket;
void StartEchoServer(Socket socket)
{
_simpleSocket = new SimpleAsyncSocket(socket, OnSendCallback,
_receiveBufferPool, OnReceiveCallback);
}
bool OnReceiveCallback(SimpleAsyncSocket socket,
ArraySegment<byte> bytes)
{
var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
_simpleSocket.SendAsync(new ArraySegment<byte>(Enc.GetBytes(str)));
return false;
}
void OnSendCallback(SimpleAsyncSocket asyncSocket,
ICollection<ArraySegment<byte>> collection, SocketError arg3)
{
var bytes = collection.First();
var str = Enc.GetString(bytes.Array, bytes.Offset, bytes.Count);
}
下面是实现的草图:
class SimpleAsyncSocket
{
private readonly Socket _socket;
private readonly Pool<byte[]> _receiveBufferPool;
private readonly SocketAsyncEventArgs _recvAsyncEventArgs;
private readonly SocketAsyncEventArgs _sendAsyncEventArgs;
private readonly byte[] _waitForReceiveEventBuffer = new byte[1];
private readonly Queue<ArraySegment<byte>> _sendBuffers = new Queue<ArraySegment<byte>>();
public SimpleAsyncSocket(Socket socket, Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError> sendCallback,
Pool<byte[]> receiveBufferPool, Func<SimpleAsyncSocket, ArraySegment<byte>, bool> receiveCallback)
{
if (socket == null) throw new ArgumentNullException("socket");
if (sendCallback == null) throw new ArgumentNullException("sendCallback");
if (receiveBufferPool == null) throw new ArgumentNullException("receiveBufferPool");
if (receiveCallback == null) throw new ArgumentNullException("receiveCallback");
_socket = socket;
_sendAsyncEventArgs = new SocketAsyncEventArgs();
_sendAsyncEventArgs.UserToken = sendCallback;
_sendAsyncEventArgs.Completed += SendCompleted;
_receiveBufferPool = receiveBufferPool;
_recvAsyncEventArgs = new SocketAsyncEventArgs();
_recvAsyncEventArgs.UserToken = receiveCallback;
_recvAsyncEventArgs.Completed += ReceiveCompleted;
_recvAsyncEventArgs.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
ReceiveAsyncWithoutTheHassle(_recvAsyncEventArgs);
}
public void SendAsync(ArraySegment<byte> buffer)
{
lock (_sendBuffers)
_sendBuffers.Enqueue(buffer);
StartOrContinueSending();
}
private void StartOrContinueSending(bool calledFromCompleted = false)
{
lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
{
if (!calledFromCompleted && _sendAsyncEventArgs.BufferList != null)
return; // still sending
List<ArraySegment<byte>> buffers = null;
lock (_sendBuffers)
{
if (_sendBuffers.Count > 0)
{
buffers = new List<ArraySegment<byte>>(_sendBuffers);
_sendBuffers.Clear();
}
}
_sendAsyncEventArgs.BufferList = buffers; // nothing left to send
if (buffers == null)
return;
}
if (!_socket.SendAsync(_sendAsyncEventArgs))
// Someone on Whosebug claimed that invoking the Completed
// handler synchronously might end up blowing the stack, which
// does sound possible. To avoid that guy finding my code and
// downvoting me for it (and maybe just because it's the right
// thing to do), let's leave the call stack via the ThreadPool
ThreadPool.QueueUserWorkItem(state => SendCompleted(this, _sendAsyncEventArgs));
}
private void SendCompleted(object sender, SocketAsyncEventArgs args)
{
switch (args.LastOperation)
{
case SocketAsyncOperation.Send:
{
try
{
var bytesTransferred = args.BytesTransferred;
var sendCallback = (Action<SimpleAsyncSocket, ICollection<ArraySegment<byte>>, SocketError>)args.UserToken;
// for the moment, I believe the following commented-out lock is not
// necessary, but still have to think it through properly
// lock (_waitForReceiveEventBuffer) // reuse unrelated object for locking
{
sendCallback(this, args.BufferList, args.SocketError);
}
StartOrContinueSending(true);
}
catch (Exception e)
{
args.BufferList = null;
// todo: log and disconnect
}
break;
}
case SocketAsyncOperation.None:
break;
default:
throw new Exception("Unsupported operation: " + args.LastOperation);
}
}
private void ReceiveCompleted(object sender, SocketAsyncEventArgs args)
{
switch (args.LastOperation)
{
case SocketAsyncOperation.Receive:
{
var bytesTransferred = args.BytesTransferred;
var buffer = args.Buffer;
if (args.BytesTransferred == 0) // remote end closed connection
{
args.SetBuffer(null, 0, 0);
if (buffer != _waitForReceiveEventBuffer)
_receiveBufferPool.Return(buffer);
// todo: disconnect event
return;
}
if (buffer == _waitForReceiveEventBuffer)
{
if (args.BytesTransferred == 1)
{
// we received one byte, there's probably more!
var biggerBuffer = _receiveBufferPool.Take();
biggerBuffer[0] = _waitForReceiveEventBuffer[0];
args.SetBuffer(biggerBuffer, 1, biggerBuffer.Length - 1);
ReceiveAsyncWithoutTheHassle(args);
}
else
throw new Exception("What the heck");
}
else
{
var callback = (Func<SimpleAsyncSocket, ArraySegment<byte>, bool>)args.UserToken;
bool calleeExpectsMoreDataImmediately = false;
bool continueReceiving = false;
try
{
var count = args.Offset == 1
// we set the first byte manually from _waitForReceiveEventBuffer
? bytesTransferred + 1
: bytesTransferred;
calleeExpectsMoreDataImmediately = callback(this, new ArraySegment<byte>(buffer, 0, count));
continueReceiving = true;
}
catch (Exception e)
{
// todo: log and disconnect
}
finally
{
if (!calleeExpectsMoreDataImmediately)
{
args.SetBuffer(_waitForReceiveEventBuffer, 0, 1);
_receiveBufferPool.Return(buffer);
}
}
if (continueReceiving)
ReceiveAsyncWithoutTheHassle(args);
}
break;
}
case SocketAsyncOperation.None:
break;
default:
throw new Exception("Unsupported operation: " + args.LastOperation);
}
}
private void ReceiveAsyncWithoutTheHassle(SocketAsyncEventArgs args)
{
if (!_socket.ReceiveAsync(args))
// Someone on Whosebug claimed that invoking the Completed
// handler synchronously might end up blowing the stack, which
// does sound possible. To avoid that guy finding my code and
// downvoting me for it (and maybe just because it's the right
// thing to do), let's leave the call stack via the ThreadPool
ThreadPool.QueueUserWorkItem(state => ReceiveCompleted(this, args));
}
}