WebSocket SendAsync 期间压力下未观察到的任务异常

Unobserved task exception under stress during WebSocket SendAsync

我正在强调我正在编写的一项服务,该服务使用取自 AcceptWebSocketAsyncWebSocket。我用来通过 WebSocket 发送消息的代码是这样的:

    static bool
    SendMessage(WebSocket webSocket, WebSocketMessage message, byte[] buffer, CancellationToken cancellationToken)
    {
        try {
            var endOfMessage = false;
            do {
                using(var timeout = new CancellationTokenSource(webSocketsTimeout))
                using(var lcts    = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeout.Token)) {
                    var count    = message.Content.Read(buffer, 0, buffer.Length);
                    endOfMessage = count < buffer.Length;
                    // ReSharper disable once MethodSupportsCancellation
                    webSocket
                        .SendAsync(new ArraySegment<byte>(buffer, 0, count), message.Type, endOfMessage, lcts.Token)
                        .Wait() // SendAsync should be canceled using the Token.
                    ;
                }
            } while(endOfMessage == false);

            return true;
        }
        catch(Exception e) {
            TraceConnectionError(e);
            return false;
        }
        finally {
            message.Dispose();
        }
    }

我的问题是,在 "stress" 下(我每 30 秒打开和关闭 6 个连接,直到系统出现故障),我得到:

  Unhandled Exception: System.AggregateException: A Task's exception(s) were not observed either by Waiting on the Task or accessing its Exception property. As a result, the unobserved exception was rethrown by the finalizer thread. ---> System.Net.HttpListenerException: An operation was attempted on a nonexistent network connection
     at System.Net.WebSockets.WebSocketHttpListenerDuplexStream.WriteAsyncFast(HttpListenerAsyncEventArgs eventArgs)
     at System.Net.WebSockets.WebSocketHttpListenerDuplexStream.<MultipleWriteAsyncCore>d__38.MoveNext()
  --- End of stack trace from previous location where exception was thrown ---
     at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
     at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
     at System.Net.WebSockets.WebSocketBase.<SendFrameAsync>d__48.MoveNext()
  --- End of stack trace from previous location where exception was thrown ---
     at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
     at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
     at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd(Task task)
     at System.Net.WebSockets.WebSocketBase.WebSocketOperation.<Process>d__19.MoveNext()
  --- End of stack trace from previous location where exception was thrown ---
     at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
     at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
     at System.Net.WebSockets.WebSocketBase.<SendAsyncCore>d__47.MoveNext()
     --- End of inner exception stack trace ---
     at System.Threading.Tasks.TaskExceptionHolder.Finalize()

我正在使用的 Wait() 是否足以 "observe" 任务异常?

问题是 .NET 框架代码中的竞争条件。

我已报告错误 here

作为一种变通方法,我保留了一个已用 WebSocket 列表,我会定期检查这些列表 State != Open,然后调用此代码:

public static class WebSocketXs
{
    readonly static Assembly  assembly                    = typeof(WebSocket).Assembly;
    readonly static FieldInfo m_InnerStream               = assembly.GetType("System.Net.WebSockets.WebSocketBase").GetField(nameof(m_InnerStream), BindingFlags.NonPublic | BindingFlags.Instance);
    readonly static FieldInfo m_ReadTaskCompletionSource  = assembly.GetType("System.Net.WebSockets.WebSocketHttpListenerDuplexStream").GetField(nameof(m_ReadTaskCompletionSource),  BindingFlags.NonPublic | BindingFlags.Instance);
    readonly static FieldInfo m_WriteTaskCompletionSource = assembly.GetType("System.Net.WebSockets.WebSocketHttpListenerDuplexStream").GetField(nameof(m_WriteTaskCompletionSource), BindingFlags.NonPublic | BindingFlags.Instance);
    readonly static FieldInfo[] completionSourceFields    = {m_ReadTaskCompletionSource, m_WriteTaskCompletionSource };

    /// <summary>
    /// This fixes a race that happens when a <see cref="WebSocket"/> fails and aborts after failure.
    /// The <see cref="completionSourceFields"/> have an Exception that is not observed as the <see cref="WebSocket.Abort()"/>
    /// done to WebSocketBase <see cref="m_InnerStream"/> is just <see cref="TaskCompletionSource{TResult}.TrySetCanceled()"/> which
    /// does nothing with the completion source <see cref="Task.Exception"/>.
    /// That in turn raises a <see cref="TaskScheduler.UnobservedTaskException"/>.
    /// </summary>
    public static void
    CleanUpAndDispose(this WebSocket ws)
    {
        foreach(var completionSourceField in completionSourceFields) {
            m_InnerStream
                .GetValue(ws)
                .Maybe(completionSourceField.GetValue)
                .Maybe(s => s as TaskCompletionSource<object>)?
                .Task
                .Exception
                .Maybe(_ => {}) // We just need to observe any exception.
            ;
        }
        ws.Dispose();
    }
}