JSON-RPC - 使用 TaskCompletionSource (SetException) 处理异常
JSON-RPC - Handling exceptions with TaskCompletionSource (SetException)
有一个 JSON-RPC API,我目前正在实施。可以测试here.
问题是,如果将不正确的 DTO 模型传递给 SendAsync<TResponse>
,JsonSerializer.Deserialize
将抛出一个 JsonException
,我的代码没有处理它。我知道我必须以某种方式使用 SetException
,但我不知道该怎么做,所以这就是问题所在。异常消息也应该打印在控制台中。
public sealed class Client : IDisposable
{
...
private readonly ConcurrentDictionary<long, IResponseHandler> _handlers = new();
...
public Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Run(async () =>
{
await foreach (var message in _client.Start(cancellationToken))
{
using var response = JsonDocument.Parse(message);
try
{
var requestId = response.RootElement.GetProperty("id").GetInt32();
// TODO: Handle JsonException errors via `SetException`?
// TODO: Show error when incorrect input parameters are filled
if (_handlers.ContainsKey(requestId))
{
_handlers[requestId].SetResult(message);
_handlers.TryRemove(requestId, out _);
}
}
catch (KeyNotFoundException)
{
// My point is that a message should be processed only if it doesn't include `id`,
// because that means that the message is an actual web socket subscription.
_messageReceivedSubject.OnNext(message);
}
}
}, cancellationToken);
...
return Task.CompletedTask;
}
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params
};
//var tcs = new TaskCompletionSource<TResponse>();
//_requestManager.Add(request.Id, request, tcs);
var handler = new ResponseHandlerBase<TResponse>();
_handlers[request.Id] = handler;
var message = JsonSerializer.Serialize(request);
_ = _client.SendAsync(message);
return handler.Task;
//return tcs.Task;
}
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params).ConfigureAwait(false);
return response.Result;
}
...
private interface IResponseHandler
{
void SetResult(string payload);
}
private class ResponseHandlerBase<TRes> : IResponseHandler
{
private readonly TaskCompletionSource<TRes> _tcs = new();
public Task<TRes> Task => _tcs.Task;
public void SetResult(string payload)
{
var result = JsonSerializer.Deserialize(payload, typeof(TRes));
_tcs.SetResult((TRes) result);
}
}
}
巧合的是,我上周做了一件事very similar while live-coding a TCP/IP chat application。
由于在这种情况下您已经有一个 IAsyncEnumerable<string>
- 并且由于您 可以 获得响应以外的消息 - 我建议还公开 IAsyncEnumerable<string>
:
public sealed class Client : IDisposable
{
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
// TODO: parse and handle responses for our requests
yield return message;
}
}
}
如果您愿意,可以将其更改为 Rx-based (_messageReceivedSubject.OnNext
),但我认为如果您已经拥有 IAsyncEnumerable<T>
,那么您最好保持相同的抽象。
然后,您可以解析和检测响应,传递所有其他消息:
public sealed class Client : IDisposable
{
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null)
{
// TODO: handle
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
}
然后,您可以定义未完成请求的集合并处理这些请求的消息:
public sealed class Client : IDisposable
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
{
tcs.TrySetResult(response);
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
}
注意 ConcurrentDictionary.TryRemove
的用法,这比访问值然后 然后 删除它更安全。
现在你可以写你的将军了SendAsync
。正如我在视频中指出的那样,我更喜欢将在 SendAsync
中同步运行的代码和 await
响应的代码分开:
public sealed class Client : IDisposable
{
...
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params,
};
var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
_requests.TryAdd(request.Id, tcs);
return SendRequestAndWaitForResponseAsync();
async Task<TResponse> SendRequestAndWaitForResponseAsync()
{
var message = JsonSerializer.Serialize(request);
await _client.SendAsync(message);
var response = await tcs.Task;
return JsonSerializer.Deserialize(response, typeof(TResponse));
}
}
}
我完全删除了“处理程序”概念,因为它只是为 JsonSerializer.Deserialize
提供类型。此外,通过使用本地 async
方法,我可以使用 async
状态机自然传播异常。
然后,您的 higher-level 方法可以在此基础上构建:
public sealed class Client : IDisposable
{
...
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
return response.Result;
}
}
所以最终的代码是:
public sealed class Client : IDisposable
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
{
tcs.TrySetResult(response);
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params,
};
var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
_requests.TryAdd(request.Id, tcs);
return SendRequestAndWaitForResponseAsync();
async Task<TResponse> SendRequestAndWaitForResponseAsync()
{
var message = JsonSerializer.Serialize(request);
await _client.SendAsync(message);
var response = await tcs.Task;
return JsonSerializer.Deserialize(response, typeof(TResponse));
}
}
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
return response.Result;
}
}
您可能还想查看 David Fowler's Project Bedrock,这可能会大大简化此代码。
有一个 JSON-RPC API,我目前正在实施。可以测试here.
问题是,如果将不正确的 DTO 模型传递给 SendAsync<TResponse>
,JsonSerializer.Deserialize
将抛出一个 JsonException
,我的代码没有处理它。我知道我必须以某种方式使用 SetException
,但我不知道该怎么做,所以这就是问题所在。异常消息也应该打印在控制台中。
public sealed class Client : IDisposable
{
...
private readonly ConcurrentDictionary<long, IResponseHandler> _handlers = new();
...
public Task StartAsync(CancellationToken cancellationToken)
{
_ = Task.Run(async () =>
{
await foreach (var message in _client.Start(cancellationToken))
{
using var response = JsonDocument.Parse(message);
try
{
var requestId = response.RootElement.GetProperty("id").GetInt32();
// TODO: Handle JsonException errors via `SetException`?
// TODO: Show error when incorrect input parameters are filled
if (_handlers.ContainsKey(requestId))
{
_handlers[requestId].SetResult(message);
_handlers.TryRemove(requestId, out _);
}
}
catch (KeyNotFoundException)
{
// My point is that a message should be processed only if it doesn't include `id`,
// because that means that the message is an actual web socket subscription.
_messageReceivedSubject.OnNext(message);
}
}
}, cancellationToken);
...
return Task.CompletedTask;
}
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params
};
//var tcs = new TaskCompletionSource<TResponse>();
//_requestManager.Add(request.Id, request, tcs);
var handler = new ResponseHandlerBase<TResponse>();
_handlers[request.Id] = handler;
var message = JsonSerializer.Serialize(request);
_ = _client.SendAsync(message);
return handler.Task;
//return tcs.Task;
}
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params).ConfigureAwait(false);
return response.Result;
}
...
private interface IResponseHandler
{
void SetResult(string payload);
}
private class ResponseHandlerBase<TRes> : IResponseHandler
{
private readonly TaskCompletionSource<TRes> _tcs = new();
public Task<TRes> Task => _tcs.Task;
public void SetResult(string payload)
{
var result = JsonSerializer.Deserialize(payload, typeof(TRes));
_tcs.SetResult((TRes) result);
}
}
}
巧合的是,我上周做了一件事very similar while live-coding a TCP/IP chat application。
由于在这种情况下您已经有一个 IAsyncEnumerable<string>
- 并且由于您 可以 获得响应以外的消息 - 我建议还公开 IAsyncEnumerable<string>
:
public sealed class Client : IDisposable
{
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
// TODO: parse and handle responses for our requests
yield return message;
}
}
}
如果您愿意,可以将其更改为 Rx-based (_messageReceivedSubject.OnNext
),但我认为如果您已经拥有 IAsyncEnumerable<T>
,那么您最好保持相同的抽象。
然后,您可以解析和检测响应,传递所有其他消息:
public sealed class Client : IDisposable
{
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null)
{
// TODO: handle
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
}
然后,您可以定义未完成请求的集合并处理这些请求的消息:
public sealed class Client : IDisposable
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
{
tcs.TrySetResult(response);
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
}
注意 ConcurrentDictionary.TryRemove
的用法,这比访问值然后 然后 删除它更安全。
现在你可以写你的将军了SendAsync
。正如我在视频中指出的那样,我更喜欢将在 SendAsync
中同步运行的代码和 await
响应的代码分开:
public sealed class Client : IDisposable
{
...
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params,
};
var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
_requests.TryAdd(request.Id, tcs);
return SendRequestAndWaitForResponseAsync();
async Task<TResponse> SendRequestAndWaitForResponseAsync()
{
var message = JsonSerializer.Serialize(request);
await _client.SendAsync(message);
var response = await tcs.Task;
return JsonSerializer.Deserialize(response, typeof(TResponse));
}
}
}
我完全删除了“处理程序”概念,因为它只是为 JsonSerializer.Deserialize
提供类型。此外,通过使用本地 async
方法,我可以使用 async
状态机自然传播异常。
然后,您的 higher-level 方法可以在此基础上构建:
public sealed class Client : IDisposable
{
...
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
return response.Result;
}
}
所以最终的代码是:
public sealed class Client : IDisposable
{
private readonly ConcurrentDictionary<int, TaskCompletionSource<JsonDocument>> _requests = new();
public async IAsyncEnumerable<string> Start(CancellationToken cancellationToken)
{
await foreach (var message in _client.Start(cancellationToken))
{
var (requestId, response) = TryParseResponse(message);
if (requestId != null && _requests.TryRemove(requestId.Value, out var tcs))
{
tcs.TrySetResult(response);
}
else
{
yield return message;
}
}
(long? RequestId, JsonDocument? Response) TryParseResponse(string message)
{
try
{
var document = JsonDocument.Parse(message);
var requestId = response.RootElement.GetProperty("id").GetInt32();
return (document, requestId);
}
catch
{
return (null, null);
}
}
}
public Task<TResponse> SendAsync<TResponse>(string method, object @params)
{
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = NextId(),
Method = method,
Params = @params,
};
var tcs = new TaskCompletionSource<JsonDocument>(TaskCreationOptions.RunContinuationsAsynchronously);
_requests.TryAdd(request.Id, tcs);
return SendRequestAndWaitForResponseAsync();
async Task<TResponse> SendRequestAndWaitForResponseAsync()
{
var message = JsonSerializer.Serialize(request);
await _client.SendAsync(message);
var response = await tcs.Task;
return JsonSerializer.Deserialize(response, typeof(TResponse));
}
}
public async Task<AuthResponse?> AuthenticateAsync(string clientId, string clientSecret)
{
var @params = new Dictionary<string, string>
{
{"grant_type", "client_credentials"},
{"client_id", clientId},
{"client_secret", clientSecret}
};
var response = await SendAsync<SocketResponse<AuthResponse>>("public/auth", @params);
return response.Result;
}
}
您可能还想查看 David Fowler's Project Bedrock,这可能会大大简化此代码。