JSON-RPC - 使用 TaskCompletionSource (SetException) 处理异常

JSON-RPC - Handling exceptions with TaskCompletionSource (SetException)

有一个 JSON-RPC API,我目前正在实施。可以测试here.

问题是,如果将不正确的 DTO 模型传递给 SendAsync<TResponse>JsonSerializer.Deserialize 将抛出一个 JsonException,我的代码没有处理它。我知道我必须以某种方式使用 SetException,但我不知道该怎么做,所以这就是问题所在。异常消息也应该打印在控制台中。

public sealed class Client : IDisposable
{
    ...

    private readonly ConcurrentDictionary<long, IResponseHandler> _handlers = new();

    ...

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _ = Task.Run(async () =>
        {
            await foreach (var message in _client.Start(cancellationToken))
            {
                using var response = JsonDocument.Parse(message);

                try
                {
                    var requestId = response.RootElement.GetProperty("id").GetInt32();

                    // TODO: Handle JsonException errors via `SetException`?
                    // TODO: Show error when incorrect input parameters are filled
                    if (_handlers.ContainsKey(requestId))
                    {
                        _handlers[requestId].SetResult(message);
                        _handlers.TryRemove(requestId, out _);
                    }
                }
                catch (KeyNotFoundException)
                {
                    // My point is that a message should be processed only if it doesn't include `id`,
                    // because that means that the message is an actual web socket subscription.
                    _messageReceivedSubject.OnNext(message);
                }
            }
        }, cancellationToken);

        ...

        return Task.CompletedTask;
    }

    public Task<TResponse> SendAsync<TResponse>(string method, object @params)
    {
        var request = new JsonRpcRequest<object>
        {
            JsonRpc = "2.0",
            Id = NextId(),
            Method = method,
            Params = @params
        };

        //var tcs = new TaskCompletionSource<TResponse>();
        //_requestManager.Add(request.Id, request, tcs);

        var handler = new ResponseHandlerBase<TResponse>();
        _handlers[request.Id] = handler;

        var message = JsonSerializer.Serialize(request);

        _ = _client.SendAsync(message);

        return handler.Task;

        //return tcs.Task;
    }

    public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
    {
        var @params = new Dictionary<string, string>
        {
            {"grant_type", "client_credentials"},
            {"client_id", clientId},
            {"client_secret", clientSecret}
        };

        var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params).ConfigureAwait(false);
        return response.Result;
    }

    ...

    private interface IResponseHandler
    {
        void SetResult(string payload);
    }

    private class ResponseHandlerBase<TRes> : IResponseHandler
    {
        private readonly TaskCompletionSource<TRes> _tcs = new();

        public Task<TRes> Task => _tcs.Task;

        public void SetResult(string payload)
        {
            var result = JsonSerializer.Deserialize(payload, typeof(TRes));
            _tcs.SetResult((TRes) result);
        }
    }
}

巧合的是,我上周做了一件事very similar while live-coding a TCP/IP chat application

由于在这种情况下您已经有一个 IAsyncEnumerable<string> - 并且由于您 可以 获得响应以外的消息 - 我建议还公开 IAsyncEnumerable<string>

public sealed class Client : IDisposable
{
  public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
  {
    await foreach (var message in _client.Start(cancellationToken))
    {
      // TODO: parse and handle responses for our requests
      yield return message;
    }
  }
}

如果您愿意,可以将其更改为 Rx-based (_messageReceivedSubject.OnNext),但我认为如果您已经拥有 IAsyncEnumerable<T>,那么您最好保持相同的抽象。

然后,您可以解析和检测响应,传递所有其他消息:

public sealed class Client : IDisposable
{
  public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
  {
    await foreach (var message in _client.Start(cancellationToken))
    {
      var (requestId, response) = TryParseResponse(message);
      if (requestId != null)
      {
        // TODO: handle
      }
      else
      {
        yield return message;
      }
    }

    (long? RequestId, JsonDocument? Response) TryParseResponse(string message)
    {
      try
      {
        var document = JsonDocument.Parse(message);
        var requestId = response.RootElement.GetProperty("id").GetInt32();
        return (document, requestId);
      }
      catch
      {
        return (null, null);
      }
    }
  }
}

然后,您可以定义未完成请求的集合并处理这些请求的消息:

public sealed class Client : IDisposable
{
  private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();

  public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
  {
    await foreach (var message in _client.Start(cancellationToken))
    {
      var (requestId, response) = TryParseResponse(message);
      if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
      {
        tcs.TrySetResult(response);
      }
      else
      {
        yield return message;
      }
    }

    (long? RequestId, JsonDocument? Response) TryParseResponse(string message)
    {
      try
      {
        var document = JsonDocument.Parse(message);
        var requestId = response.RootElement.GetProperty("id").GetInt32();
        return (document, requestId);
      }
      catch
      {
        return (null, null);
      }
    }
  }
}

注意 ConcurrentDictionary.TryRemove 的用法,这比访问值然后 然后 删除它更安全。

现在你可以写你的将军了SendAsync。正如我在视频中指出的那样,我更喜欢将在 SendAsync 中同步运行的代码和 await 响应的代码分开:

public sealed class Client : IDisposable
{
  ...

  public Task<TResponse> SendAsync<TResponse>(string method, object @params)
  {
    var request = new JsonRpcRequest<object>
    {
      JsonRpc = "2.0",
      Id = NextId(),
      Method = method,
      Params = @params,
    };

    var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
    _requests.TryAdd(request.Id, tcs);
    return SendRequestAndWaitForResponseAsync();

    async Task<TResponse> SendRequestAndWaitForResponseAsync()
    {
      var message = JsonSerializer.Serialize(request);
      await _client.SendAsync(message);
      var response = await tcs.Task;
      return JsonSerializer.Deserialize(response, typeof(TResponse));
    }
  }
}

我完全删除了“处理程序”概念,因为它只是为 JsonSerializer.Deserialize 提供类型。此外,通过使用本地 async 方法,我可以使用 async 状态机自然传播异常。

然后,您的 higher-level 方法可以在此基础上构建:

public sealed class Client : IDisposable
{
  ...

  public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
  {
    var @params = new Dictionary<string, string>
    {
      {"grant_type", "client_credentials"},
      {"client_id", clientId},
      {"client_secret", clientSecret}
    };

    var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
    return response.Result;
  }
}

所以最终的代码是:

public sealed class Client : IDisposable
{
  private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();

  public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
  {
    await foreach (var message in _client.Start(cancellationToken))
    {
      var (requestId, response) = TryParseResponse(message);
      if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
      {
        tcs.TrySetResult(response);
      }
      else
      {
        yield return message;
      }
    }

    (long? RequestId, JsonDocument? Response) TryParseResponse(string message)
    {
      try
      {
        var document = JsonDocument.Parse(message);
        var requestId = response.RootElement.GetProperty("id").GetInt32();
        return (document, requestId);
      }
      catch
      {
        return (null, null);
      }
    }
  }

  public Task<TResponse> SendAsync<TResponse>(string method, object @params)
  {
    var request = new JsonRpcRequest<object>
    {
      JsonRpc = "2.0",
      Id = NextId(),
      Method = method,
      Params = @params,
    };

    var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
    _requests.TryAdd(request.Id, tcs);
    return SendRequestAndWaitForResponseAsync();

    async Task<TResponse> SendRequestAndWaitForResponseAsync()
    {
      var message = JsonSerializer.Serialize(request);
      await _client.SendAsync(message);
      var response = await tcs.Task;
      return JsonSerializer.Deserialize(response, typeof(TResponse));
    }
  }

  public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
  {
    var @params = new Dictionary<string, string>
    {
      {"grant_type", "client_credentials"},
      {"client_id", clientId},
      {"client_secret", clientSecret}
    };

    var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
    return response.Result;
  }
}

您可能还想查看 David Fowler's Project Bedrock,这可能会大大简化此代码。