等待联锁 == 0?

Waiting on Interlocked == 0?

免责声明:我的 C# 甚至不如我的 C++

我正在尝试学习如何在 C# 中执行异步套接字,以便为我的组件编写测试应用程序。我之前使用 TcpClient 的尝试以失败告终,您可以在此处阅读有关未解决的问题:

TcpClient.NetworkStream Async operations - Canceling / Disconnect

因为我无法让它工作,所以我尝试使用 Socket.BeginX 和 Socket.EndX 来代替。我走得更远了。我现在的问题是,在下面的清单中,当需要断开连接时,这又会调用套接字上的关闭和关闭,异步操作仍然未完成,它们将抛出一个对象处置异常或一个对象设置为空异常。

我在这里找到了类似的 post:

但是,我不接受这个答案,因为如果您对 预期的 行为使用异常,那么 1) 它们不是异常 2) 您无法判断异常是否是为您的预期情况抛出,或者如果它被抛出是因为您实际上在您的异步方法中使用了处置对象或空引用,而不是套接字。

在带有异步套接字代码的 C++ 中,我会跟踪带有 Interlocked 的未完成异步操作的数量,当需要断开连接时,我会调用 shutdown,然后等待 interlocked 达到 0,然后然后关闭并销毁我需要的任何成员。

在下面的清单中,我将如何在我的 Disconnect 方法中等待所有未完成的异步操作在 C# 中完成?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using log4net;
using System.Net.Sockets;
using System.Net;

namespace IntegrationTests
{
    public class Client2
    {
        class ReceiveContext
        {
            public Socket    m_socket;
            public const int m_bufferSize = 1024;
            public byte[]    m_buffer = new byte[m_bufferSize];
        }

        private static readonly ILog log = LogManager.GetLogger("root");

        static private ulong m_lastId = 1;

        private ulong  m_id;
        private string m_host;
        private uint   m_port;
        private uint   m_timeoutMilliseconds;
        private string m_clientId;
        private Socket m_socket;
        private uint   m_numOutstandingAsyncOps;

        public Client2(string host, uint port, string clientId, uint timeoutMilliseconds)
        {
            m_id                     = m_lastId++;
            m_host                   = host;
            m_port                   = port;
            m_clientId              = clientId;
            m_timeoutMilliseconds    = timeoutMilliseconds;
            m_socket                 = null;
            m_numOutstandingAsyncOps = 0;
        }

        ~Client2()
        {
            Disconnect();
        }

        public void Connect()
        {
            IPHostEntry ipHostInfo = Dns.GetHostEntry(m_host);
            IPAddress[] ipV4Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
            IPAddress[] ipV6Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
            IPEndPoint endpoint = new IPEndPoint(ipV4Addresses[0], (int)m_port);

            m_socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            m_socket.ReceiveTimeout = (int)m_timeoutMilliseconds;
            m_socket.SendTimeout    = (int)m_timeoutMilliseconds;

            try
            {
                m_socket.Connect(endpoint);

                log.Info(string.Format("Connected to: {0}", m_socket.RemoteEndPoint.ToString()));

                // Issue the next async receive
                ReceiveContext context = new ReceiveContext();
                context.m_socket = m_socket;
                m_socket.BeginReceive(context.m_buffer, 0, ReceiveContext.m_bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), context);
            }
            catch (Exception e)
            {
                // Error
                log.Error(string.Format("Client #{0} Exception caught OnConnect. Exception: {1}"
                                       , m_id, e.ToString()));
            }
        }

        public void Disconnect()
        {
            if (m_socket != null)
            {
                m_socket.Shutdown(SocketShutdown.Both);

                // TODO - <--- Error here in the callbacks where they try to use the socket and it is disposed
                //        We need to wait here until all outstanding async operations complete
                //        Should we use Interlocked to keep track of them and wait on it somehow?
                m_socket.Close();
                m_socket = null;
            }
        }

        public void Login()
        {
            string loginRequest = string.Format("loginstuff{0})", m_clientId);
            var data = Encoding.ASCII.GetBytes(loginRequest);

            m_socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), m_socket);
        }

        public void MakeRequest(string thingy)
        {
            string message = string.Format("requeststuff{0}", thingy);
            var data = Encoding.ASCII.GetBytes(message);

            m_socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), m_socket);
        }

        void OnReceive(IAsyncResult asyncResult)
        {
            ReceiveContext context = (ReceiveContext)asyncResult.AsyncState;

            string data = null;
            try
            {
                int bytesReceived = context.m_socket.EndReceive(asyncResult);
                data = Encoding.ASCII.GetString(context.m_buffer, 0, bytesReceived);

                ReceiveContext newContext = new ReceiveContext();
                newContext.m_socket = context.m_socket;

                m_socket.BeginReceive(newContext.m_buffer, 0, ReceiveContext.m_bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), newContext);
            }
            catch(SocketException e)
            {
                if(e.SocketErrorCode == SocketError.ConnectionAborted) // Check if we disconnected on our end
                {
                    return;
                }
            }
            catch (Exception e)
            {
                // Error
                log.Error(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
                                       , m_id, e.ToString()));
            }
        }

        void OnSend(IAsyncResult asyncResult)
        {
            Socket socket = (Socket)asyncResult.AsyncState;

            try
            {
                int bytesSent = socket.EndSend(asyncResult);
            }
            catch(Exception e)
            {
                log.Error(string.Format("Client #{0} Exception caught OnSend. Exception: {1}"
                                       , m_id, e.ToString()));
            }
        }
    }
}

主要:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

using log4net;
using log4net.Config;

namespace IntegrationTests
{
    class Program
    {
        private static readonly ILog log = LogManager.GetLogger("root");

        static void Main(string[] args)
        {
            try
            {
                XmlConfigurator.Configure();
                log.Info("Starting Component Integration Tests...");

                Client2 client = new Client2("127.0.0.1", 24001, "MyClientId", 60000);
                client.Connect();
                client.Login();
                client.MakeRequest("StuffAndPuff");

                System.Threading.Thread.Sleep(60000); // Sim work until user shutsdown

                client.Disconnect();
            }
            catch (Exception e)
            {
                log.Error(string.Format("Caught an exception in main. Exception: {0}"
                                      , e.ToString()));
            }
        }
    }
}

编辑:

这是我尽我所能使用 Evk 提出的答案的额外尝试。据我所知,它工作正常。

这个问题是,我觉得我基本上把所有异步调用都变成了同步调用,因为需要锁定任何会改变计数器或套接字状态的东西。同样,与我的 C++ 相比,我是 C# 的新手,所以请指出我是否完全错过了解释他的答案的标记。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace IntegrationTests
{
    public class Client
    {
        class ReceiveContext
        {
            public const int     _bufferSize    = 1024;
            public byte[]        _buffer        = new byte[_bufferSize]; // Contains bytes from one receive
            public StringBuilder _stringBuilder = new StringBuilder();   // Contains bytes for multiple receives in order to build message up to delim
        }

        private static readonly ILog _log = LogManager.GetLogger("root");

        static private ulong _lastId = 1;
        private ulong  _id;

        protected string         _host;
        protected int            _port;
        protected int            _timeoutMilliseconds;
        protected string         _sessionId;
        protected Socket         _socket;
        protected object         _lockNumOutstandingAsyncOps;
        protected int            _numOutstandingAsyncOps;
        private bool             _disposed = false;

        public Client(string host, int port, string sessionId, int timeoutMilliseconds)
        {
            _id                         = _lastId++;
            _host                       = host;
            _port                       = port;
            _sessionId                  = sessionId;
            _timeoutMilliseconds        = timeoutMilliseconds;
            _socket                     = null;
            _numOutstandingAsyncOps     = 0;
            _lockNumOutstandingAsyncOps = new object();
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
            if(_disposed)
            {
                return;
            }

            if (disposing)
            {
                _socket.Close();
            }

            _disposed = true;
        }

        public void Connect()
        {
            lock (_lockNumOutstandingAsyncOps)
            {
                IPHostEntry ipHostInfo = Dns.GetHostEntry(_host);
                IPAddress[] ipV4Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
                IPAddress[] ipV6Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
                IPEndPoint endpoint = new IPEndPoint(ipV4Addresses[0], _port);

                _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                _socket.ReceiveTimeout = _timeoutMilliseconds;
                _socket.SendTimeout = _timeoutMilliseconds;

                try
                {
                    _socket.Connect(endpoint);
                }
                catch (Exception e)
                {
                    // Error
                    Debug.WriteLine(string.Format("Client #{0} Exception caught OnConnect. Exception: {1}"
                                           , _id, e.ToString()));
                    return;
                }

                Debug.WriteLine(string.Format("Client #{0} connected to: {1}", _id, _socket.RemoteEndPoint.ToString()));

                // Issue the first async receive
                ReceiveContext context = new ReceiveContext();

                ++_numOutstandingAsyncOps;
                _socket.BeginReceive(context._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), context);
            }
        }

        public void Disconnect()
        {
            if (_socket != null)
            {
                // We need to wait here until all outstanding async operations complete
                // In order to avoid getting 'Object was disposed' exceptions in those async ops that use the socket
                lock(_lockNumOutstandingAsyncOps)
                {
                    Debug.WriteLine(string.Format("Client #{0} Disconnecting...", _id));

                    _socket.Shutdown(SocketShutdown.Both);

                    while (_numOutstandingAsyncOps > 0)
                    {
                        Monitor.Wait(_lockNumOutstandingAsyncOps);
                    }

                    _socket.Close();
                    _socket = null;
                }
            }
        }

        public void Login()
        {
            lock (_lockNumOutstandingAsyncOps)
            {
                if (_socket != null && _socket.Connected)
                {
                    string loginRequest = string.Format("loginstuff{0}", _clientId);
                    var data = Encoding.ASCII.GetBytes(loginRequest);

                    Debug.WriteLine(string.Format("Client #{0} Sending Login Request: {1}"
                                           , _id, loginRequest));

                    ++_numOutstandingAsyncOps;
                    _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
                }
                else
                {
                    Debug.WriteLine(string.Format("Client #{0} Login was called, but Socket is null or no longer connected."
                                           , _id));
                }
            }
        }

        public void MakeRequest(string thingy)
        {
            lock (_lockNumOutstandingAsyncOps)
            {
                if (_socket != null && _socket.Connected)
                {
                    string message = string.Format("requeststuff{0}", thingy);
                    var data = Encoding.ASCII.GetBytes(message);

                    Debug.WriteLine(string.Format("Client #{0} Sending Request: {1}"
                                           , _id, message));

                    ++_numOutstandingAsyncOps;
                    _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
                }
                else
                {
                    Debug.WriteLine(string.Format("Client #{0} MakeRequest was called, but Socket is null or no longer connected."
                                           , _id));
                }
            }
        }

        protected void OnReceive(IAsyncResult asyncResult)
        {
            lock (_lockNumOutstandingAsyncOps)
            {
                ReceiveContext context = (ReceiveContext)asyncResult.AsyncState;

                string data = null;

                try
                {
                    int bytesReceived = _socket.EndReceive(asyncResult);
                    data = Encoding.ASCII.GetString(context._buffer, 0, bytesReceived);

                    // If the remote host shuts down the Socket connection with the Shutdown method, and all available data has been received,
                    // the EndReceive method will complete immediately and return zero bytes
                    if (bytesReceived > 0)
                    {
                        StringBuilder stringBuilder = context._stringBuilder.Append(data);

                        int index = -1;
                        do
                        {
                            index = stringBuilder.ToString().IndexOf("#");
                            if (index != -1)
                            {
                                string message = stringBuilder.ToString().Substring(0, index + 1);
                                stringBuilder.Remove(0, index + 1);

                                Debug.WriteLine(string.Format("Client #{0} Received Data: {1}"
                                                       , _id, message));
                            }
                        } while (index != -1);
                    }
                }
                catch (SocketException e)
                {
                    // Check if we disconnected on our end
                    if (e.SocketErrorCode == SocketError.ConnectionAborted)
                    {
                        // Ignore
                    }
                    else
                    {
                        // Error
                        Debug.WriteLine(string.Format("Client #{0} SocketException caught OnReceive. Exception: {1}"
                                               , _id, e.ToString()));
                        Disconnect();
                    }
                }
                catch (Exception e)
                {
                    // Error
                    Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
                                           , _id, e.ToString()));
                    Disconnect();
                }
                finally
                {
                    --_numOutstandingAsyncOps;
                    Monitor.Pulse(_lockNumOutstandingAsyncOps);
                }
            }

            // Issue the next async receive
            lock (_lockNumOutstandingAsyncOps)
            {
                if (_socket != null && _socket.Connected)
                {
                    ++_numOutstandingAsyncOps;

                    ReceiveContext newContext = new ReceiveContext();
                    _socket.BeginReceive(newContext._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), newContext);
                }
            }
        }

        protected void OnSend(IAsyncResult asyncResult)
        {
            lock (_lockNumOutstandingAsyncOps)
            {
                try
                {
                    int bytesSent = _socket.EndSend(asyncResult);
                }
                catch (Exception e)
                {
                    Debug.WriteLine(string.Format("Client #{0} Exception caught OnSend. Exception: {1}"
                                           , _id, e.ToString()));
                    Disconnect();
                }
                finally
                {
                    --_numOutstandingAsyncOps;
                    Monitor.Pulse(_lockNumOutstandingAsyncOps);
                }
            }
        }
    }
}

您可以使用 Monitor.WaitMonitor.Pulse

static int _outstandingOperations;
static readonly object _lock = new object();
static void Main() {
    for (int i = 0; i < 100; i++) {
        var tmp = i;
        Task.Run(() =>
        {
            lock (_lock) {
                _outstandingOperations++;
            }
            // some work
            Thread.Sleep(new Random(tmp).Next(0, 5000));
            lock (_lock) {
                _outstandingOperations--;
                // notify condition might have changed
                Monitor.Pulse(_lock);
            }
        });
    }

    lock (_lock) {
        // condition check
        while (_outstandingOperations > 0)
            // will wait here until pulsed, lock will be released during wait
            Monitor.Wait(_lock);
    }
}