Request/Response 模式与 TPL 数据流
Request/Response pattern with TPL Dataflow
我们在使用 TPL 数据流库时遇到问题,需要 request/response 模式。我们的问题是我们有一个调用依赖服务的 .NET 核心 API。依赖服务限制并发请求。我们的API不限制并发请求;因此,我们一次可以收到数千个请求。在这种情况下,依赖服务将在达到其限制后拒绝请求。因此,我们实现了一个BufferBlock<T>
和一个TransformBlock<TIn, TOut>
。表演很扎实,效果很好。我们测试了我们的 API 前端,有 1000 个用户发布了 100 个 requests/sec,0 个问题。缓冲块缓冲请求,转换块并行执行我们所需数量的请求。依赖服务接收我们的请求并响应。我们 return 在变换块操作中的响应一切都很好。我们的问题是缓冲块和转换块断开连接,这意味着 requests/responses 不同步。我们遇到了一个请求将收到另一个请求者的响应的问题(请参阅下面的代码)。
具体到下面的代码,我们的问题出在了GetContent
这个方法上。该方法是从我们 API 中的服务层调用的,最终是从我们的控制器调用的。下面的代码和服务层都是单例。缓冲区的 SendAsync
与转换块 ReceiveAsync
断开连接,因此可以 return 编辑任意响应,不一定是发出的请求。
所以,我们的问题是:有没有办法使用数据流块来关联 request/responses?最终目标是请求进入我们的 API,被发送到依赖服务,然后被 return 发送到客户端。我们的数据流实现代码如下。
public class HttpClientWrapper : IHttpClientManager
{
private readonly IConfiguration _configuration;
private readonly ITokenService _tokenService;
private HttpClient _client;
private BufferBlock<string> _bufferBlock;
private TransformBlock<string, JObject> _actionBlock;
public HttpClientWrapper(IConfiguration configuration, ITokenService tokenService)
{
_configuration = configuration;
_tokenService = tokenService;
_bufferBlock = new BufferBlock<string>();
var executionDataFlowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
_actionBlock = new TransformBlock<string, JObject>(t => ProcessRequest(t),
executionDataFlowBlockOptions);
_bufferBlock.LinkTo(_actionBlock, dataFlowLinkOptions);
}
public void Connect()
{
_client = new HttpClient();
_client.DefaultRequestHeaders.Add("x-ms-client-application-name",
"ourappname");
}
public async Task<JObject> GetContent(string request)
{
await _bufferBlock.SendAsync(request);
var result = await _actionBlock.ReceiveAsync();
return result;
}
private async Task<JObject> ProcessRequest(string request)
{
if (_client == null)
{
Connect();
}
try
{
var accessToken = await _tokenService.GetTokenAsync(_configuration);
var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post,
new Uri($"https://{_configuration.Uri}"));
// add the headers
httpRequestMessage.Headers.Add("Authorization", $"Bearer {accessToken}");
// add the request body
httpRequestMessage.Content = new StringContent(request, Encoding.UTF8,
"application/json");
var postRequest = await _client.SendAsync(httpRequestMessage);
var response = await postRequest.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<JObject>(response);
}
catch (Exception ex)
{
// log error
return new JObject();
}
}
}
您需要做的是用一个 id 标记每个传入的项目,以便您可以将数据输入与结果输出相关联。这是一个如何做到这一点的例子:
namespace ConcurrentFlows.DataflowJobs {
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
/// <summary>
/// A generic interface defining that:
/// for a specified input type => an awaitable result is produced.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public interface IJobManager<TInput, TOutput> {
Task<TOutput> SubmitRequest(TInput data);
}
/// <summary>
/// A TPL-Dataflow based job manager.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {
/// <summary>
/// It is anticipated that jobHandler is an injected
/// singleton instance of a Dataflow based 'calculator', though this implementation
/// does not depend on it being a singleton.
/// </summary>
/// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }
this.JobHandler = JobHandler;
if (!alreadyLinked) {
JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
alreadyLinked = true;
}
}
private static bool alreadyLinked = false;
/// <summary>
/// Submits the request to the JobHandler and asynchronously awaits the result.
/// </summary>
/// <param name="data">The input data to be processd.</param>
/// <returns></returns>
public async Task<TOutput> SubmitRequest(TInput data) {
var taggedData = TagInputData(data);
var job = CreateJob(taggedData);
Jobs.TryAdd(job.Key, job.Value);
await JobHandler.SendAsync(taggedData);
return await job.Value.Task;
}
private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
get;
} = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();
private static ExecutionDataflowBlockOptions Options {
get;
} = GetResultHandlerOptions();
private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
get;
} = CreateReplyHandler(Options);
private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
get;
}
private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
var id = Guid.NewGuid();
return new KeyValuePair<Guid, TInput>(id, data);
}
private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
var id = taggedData.Key;
var jobCompletionSource = new TaskCompletionSource<TOutput>();
return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
}
private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
return new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 1000
};
}
private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
RecieveOutput(result);
}, options);
}
private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
var jobId = result.Key;
TaskCompletionSource<TOutput> jobCompletionSource;
if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
throw new InvalidOperationException($"The jobId: {jobId} was not found.");
}
var resultValue = result.Value;
jobCompletionSource.SetResult(resultValue);
}
}
}
另见 以供参考。
感谢 JSteward 提供的答案。他的方法是完全可以接受的;但是,我最终通过使用 SemaphoreSlim 来完成此操作。 SemaphoreSlim 提供了两个功能,使其成为一个强大的解决方案。首先,它提供了一个构造函数重载,您可以在其中发送一个计数。此计数指的是能够通过信号量等待机制的并发项目数。等待机制由称为 WaitAsync 的方法提供。使用以下方法,其中 Worker class 作为单例,并发请求进入,每次执行 http 请求时限制为 10 个,并且响应全部返回到正确的请求。因此,实现可能如下所示:
public class Worker: IWorker
{
private readonly IHttpClientManager _httpClient;
private readonly ITokenService _tokenService;
private readonly SemaphoreSlim _semaphore;
public Worker(IHttpClientManager httpClient, ITokenService tokenService)
{
_httpClient = httpClient;
_tokenService = tokenService;
// we want to limit the number of items here
_semaphore = new SemaphoreSlim(10);
}
public async Task<JObject> ProcessRequestAsync(string request, string route)
{
try
{
var accessToken = await _tokenService.GetTokenAsync(
_timeSeriesConfiguration.TenantId,
_timeSeriesConfiguration.ClientId,
_timeSeriesConfiguration.ClientSecret);
var cancellationToken = new CancellationTokenSource();
cancellationToken.CancelAfter(30000);
await _semaphore.WaitAsync(cancellationToken.Token);
var httpResponseMessage = await _httpClient.SendAsync(new HttpClientRequest
{
Method = HttpMethod.Post,
Uri = $"https://someuri/someroute",
Token = accessToken,
Content = request
});
var response = await httpResponseMessage.Content.ReadAsStringAsync();
return response;
}
catch (Exception ex)
{
// do some logging
throw;
}
finally
{
_semaphore.Release();
}
}
}
进行简单的节流并不是 TPL 数据流库特别有吸引力的用例,而使用 SemaphoreSlim
似乎更简单也更有吸引力。但是,如果您想要更多功能,例如为每个请求强制执行最短持续时间,或者有一种方法可以等待所有待处理的请求完成,那么 TPL 数据流可以提供 SemaphoreSlim
无法提供的功能.基本思想是避免将裸输入值传递给块,然后尝试将它们与生成的结果相关联。根据请求立即创建任务、将任务发送到 ActionBlock<Task>
并让块激活并 await
使用其指定的 MaxDegreeOfParallelism
异步执行这些任务要安全得多。这样输入值及其结果将永远明确地绑定在一起。
public class ThrottledExecution<T>
{
private readonly ActionBlock<Task<Task<T>>> _actionBlock;
private readonly CancellationToken _cancellationToken;
public ThrottledExecution(int concurrencyLevel, int minDurationMilliseconds = 0,
CancellationToken cancellationToken = default)
{
if (minDurationMilliseconds < 0) throw new ArgumentOutOfRangeException();
_actionBlock = new ActionBlock<Task<Task<T>>>(async task =>
{
try
{
var delay = Task.Delay(minDurationMilliseconds, cancellationToken);
task.RunSynchronously();
await task.Unwrap().ConfigureAwait(false);
await delay.ConfigureAwait(false);
}
catch { } // Ignore exceptions (errors are propagated through the task)
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = concurrencyLevel,
CancellationToken = cancellationToken,
});
_cancellationToken = cancellationToken;
}
public Task<T> Run(Func<Task<T>> function)
{
// Create a cold task (the function will be invoked later)
var task = new Task<Task<T>>(function, _cancellationToken);
var accepted = _actionBlock.Post(task);
if (!accepted)
{
_cancellationToken.ThrowIfCancellationRequested();
throw new InvalidOperationException(
"The component has been marked as complete.");
}
return task.Unwrap();
}
public void Complete() => _actionBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
用法示例:
private ThrottledExecution<JObject> throttledExecution
= new ThrottledExecution<JObject>(concurrencyLevel: 10);
public Task<JObject> GetContent(string request)
{
return throttledExecution.Run(() => ProcessRequest(request));
}
我们在使用 TPL 数据流库时遇到问题,需要 request/response 模式。我们的问题是我们有一个调用依赖服务的 .NET 核心 API。依赖服务限制并发请求。我们的API不限制并发请求;因此,我们一次可以收到数千个请求。在这种情况下,依赖服务将在达到其限制后拒绝请求。因此,我们实现了一个BufferBlock<T>
和一个TransformBlock<TIn, TOut>
。表演很扎实,效果很好。我们测试了我们的 API 前端,有 1000 个用户发布了 100 个 requests/sec,0 个问题。缓冲块缓冲请求,转换块并行执行我们所需数量的请求。依赖服务接收我们的请求并响应。我们 return 在变换块操作中的响应一切都很好。我们的问题是缓冲块和转换块断开连接,这意味着 requests/responses 不同步。我们遇到了一个请求将收到另一个请求者的响应的问题(请参阅下面的代码)。
具体到下面的代码,我们的问题出在了GetContent
这个方法上。该方法是从我们 API 中的服务层调用的,最终是从我们的控制器调用的。下面的代码和服务层都是单例。缓冲区的 SendAsync
与转换块 ReceiveAsync
断开连接,因此可以 return 编辑任意响应,不一定是发出的请求。
所以,我们的问题是:有没有办法使用数据流块来关联 request/responses?最终目标是请求进入我们的 API,被发送到依赖服务,然后被 return 发送到客户端。我们的数据流实现代码如下。
public class HttpClientWrapper : IHttpClientManager
{
private readonly IConfiguration _configuration;
private readonly ITokenService _tokenService;
private HttpClient _client;
private BufferBlock<string> _bufferBlock;
private TransformBlock<string, JObject> _actionBlock;
public HttpClientWrapper(IConfiguration configuration, ITokenService tokenService)
{
_configuration = configuration;
_tokenService = tokenService;
_bufferBlock = new BufferBlock<string>();
var executionDataFlowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
};
var dataFlowLinkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
_actionBlock = new TransformBlock<string, JObject>(t => ProcessRequest(t),
executionDataFlowBlockOptions);
_bufferBlock.LinkTo(_actionBlock, dataFlowLinkOptions);
}
public void Connect()
{
_client = new HttpClient();
_client.DefaultRequestHeaders.Add("x-ms-client-application-name",
"ourappname");
}
public async Task<JObject> GetContent(string request)
{
await _bufferBlock.SendAsync(request);
var result = await _actionBlock.ReceiveAsync();
return result;
}
private async Task<JObject> ProcessRequest(string request)
{
if (_client == null)
{
Connect();
}
try
{
var accessToken = await _tokenService.GetTokenAsync(_configuration);
var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post,
new Uri($"https://{_configuration.Uri}"));
// add the headers
httpRequestMessage.Headers.Add("Authorization", $"Bearer {accessToken}");
// add the request body
httpRequestMessage.Content = new StringContent(request, Encoding.UTF8,
"application/json");
var postRequest = await _client.SendAsync(httpRequestMessage);
var response = await postRequest.Content.ReadAsStringAsync();
return JsonConvert.DeserializeObject<JObject>(response);
}
catch (Exception ex)
{
// log error
return new JObject();
}
}
}
您需要做的是用一个 id 标记每个传入的项目,以便您可以将数据输入与结果输出相关联。这是一个如何做到这一点的例子:
namespace ConcurrentFlows.DataflowJobs {
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
/// <summary>
/// A generic interface defining that:
/// for a specified input type => an awaitable result is produced.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public interface IJobManager<TInput, TOutput> {
Task<TOutput> SubmitRequest(TInput data);
}
/// <summary>
/// A TPL-Dataflow based job manager.
/// </summary>
/// <typeparam name="TInput">The type of data to process.</typeparam>
/// <typeparam name="TOutput">The type of data the consumer expects back.</typeparam>
public class DataflowJobManager<TInput, TOutput> : IJobManager<TInput, TOutput> {
/// <summary>
/// It is anticipated that jobHandler is an injected
/// singleton instance of a Dataflow based 'calculator', though this implementation
/// does not depend on it being a singleton.
/// </summary>
/// <param name="jobHandler">A singleton Dataflow block through which all jobs are processed.</param>
public DataflowJobManager(IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> jobHandler) {
if (jobHandler == null) { throw new ArgumentException("Argument cannot be null.", "jobHandler"); }
this.JobHandler = JobHandler;
if (!alreadyLinked) {
JobHandler.LinkTo(ResultHandler, new DataflowLinkOptions() { PropagateCompletion = true });
alreadyLinked = true;
}
}
private static bool alreadyLinked = false;
/// <summary>
/// Submits the request to the JobHandler and asynchronously awaits the result.
/// </summary>
/// <param name="data">The input data to be processd.</param>
/// <returns></returns>
public async Task<TOutput> SubmitRequest(TInput data) {
var taggedData = TagInputData(data);
var job = CreateJob(taggedData);
Jobs.TryAdd(job.Key, job.Value);
await JobHandler.SendAsync(taggedData);
return await job.Value.Task;
}
private static ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>> Jobs {
get;
} = new ConcurrentDictionary<Guid, TaskCompletionSource<TOutput>>();
private static ExecutionDataflowBlockOptions Options {
get;
} = GetResultHandlerOptions();
private static ITargetBlock<KeyValuePair<Guid, TOutput>> ResultHandler {
get;
} = CreateReplyHandler(Options);
private IPropagatorBlock<KeyValuePair<Guid, TInput>, KeyValuePair<Guid, TOutput>> JobHandler {
get;
}
private KeyValuePair<Guid, TInput> TagInputData(TInput data) {
var id = Guid.NewGuid();
return new KeyValuePair<Guid, TInput>(id, data);
}
private KeyValuePair<Guid, TaskCompletionSource<TOutput>> CreateJob(KeyValuePair<Guid, TInput> taggedData) {
var id = taggedData.Key;
var jobCompletionSource = new TaskCompletionSource<TOutput>();
return new KeyValuePair<Guid, TaskCompletionSource<TOutput>>(id, jobCompletionSource);
}
private static ExecutionDataflowBlockOptions GetResultHandlerOptions() {
return new ExecutionDataflowBlockOptions() {
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 1000
};
}
private static ITargetBlock<KeyValuePair<Guid, TOutput>> CreateReplyHandler(ExecutionDataflowBlockOptions options) {
return new ActionBlock<KeyValuePair<Guid, TOutput>>((result) => {
RecieveOutput(result);
}, options);
}
private static void RecieveOutput(KeyValuePair<Guid, TOutput> result) {
var jobId = result.Key;
TaskCompletionSource<TOutput> jobCompletionSource;
if (!Jobs.TryRemove(jobId, out jobCompletionSource)) {
throw new InvalidOperationException($"The jobId: {jobId} was not found.");
}
var resultValue = result.Value;
jobCompletionSource.SetResult(resultValue);
}
}
}
另见
感谢 JSteward 提供的答案。他的方法是完全可以接受的;但是,我最终通过使用 SemaphoreSlim 来完成此操作。 SemaphoreSlim 提供了两个功能,使其成为一个强大的解决方案。首先,它提供了一个构造函数重载,您可以在其中发送一个计数。此计数指的是能够通过信号量等待机制的并发项目数。等待机制由称为 WaitAsync 的方法提供。使用以下方法,其中 Worker class 作为单例,并发请求进入,每次执行 http 请求时限制为 10 个,并且响应全部返回到正确的请求。因此,实现可能如下所示:
public class Worker: IWorker
{
private readonly IHttpClientManager _httpClient;
private readonly ITokenService _tokenService;
private readonly SemaphoreSlim _semaphore;
public Worker(IHttpClientManager httpClient, ITokenService tokenService)
{
_httpClient = httpClient;
_tokenService = tokenService;
// we want to limit the number of items here
_semaphore = new SemaphoreSlim(10);
}
public async Task<JObject> ProcessRequestAsync(string request, string route)
{
try
{
var accessToken = await _tokenService.GetTokenAsync(
_timeSeriesConfiguration.TenantId,
_timeSeriesConfiguration.ClientId,
_timeSeriesConfiguration.ClientSecret);
var cancellationToken = new CancellationTokenSource();
cancellationToken.CancelAfter(30000);
await _semaphore.WaitAsync(cancellationToken.Token);
var httpResponseMessage = await _httpClient.SendAsync(new HttpClientRequest
{
Method = HttpMethod.Post,
Uri = $"https://someuri/someroute",
Token = accessToken,
Content = request
});
var response = await httpResponseMessage.Content.ReadAsStringAsync();
return response;
}
catch (Exception ex)
{
// do some logging
throw;
}
finally
{
_semaphore.Release();
}
}
}
进行简单的节流并不是 TPL 数据流库特别有吸引力的用例,而使用 SemaphoreSlim
似乎更简单也更有吸引力。但是,如果您想要更多功能,例如为每个请求强制执行最短持续时间,或者有一种方法可以等待所有待处理的请求完成,那么 TPL 数据流可以提供 SemaphoreSlim
无法提供的功能.基本思想是避免将裸输入值传递给块,然后尝试将它们与生成的结果相关联。根据请求立即创建任务、将任务发送到 ActionBlock<Task>
并让块激活并 await
使用其指定的 MaxDegreeOfParallelism
异步执行这些任务要安全得多。这样输入值及其结果将永远明确地绑定在一起。
public class ThrottledExecution<T>
{
private readonly ActionBlock<Task<Task<T>>> _actionBlock;
private readonly CancellationToken _cancellationToken;
public ThrottledExecution(int concurrencyLevel, int minDurationMilliseconds = 0,
CancellationToken cancellationToken = default)
{
if (minDurationMilliseconds < 0) throw new ArgumentOutOfRangeException();
_actionBlock = new ActionBlock<Task<Task<T>>>(async task =>
{
try
{
var delay = Task.Delay(minDurationMilliseconds, cancellationToken);
task.RunSynchronously();
await task.Unwrap().ConfigureAwait(false);
await delay.ConfigureAwait(false);
}
catch { } // Ignore exceptions (errors are propagated through the task)
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = concurrencyLevel,
CancellationToken = cancellationToken,
});
_cancellationToken = cancellationToken;
}
public Task<T> Run(Func<Task<T>> function)
{
// Create a cold task (the function will be invoked later)
var task = new Task<Task<T>>(function, _cancellationToken);
var accepted = _actionBlock.Post(task);
if (!accepted)
{
_cancellationToken.ThrowIfCancellationRequested();
throw new InvalidOperationException(
"The component has been marked as complete.");
}
return task.Unwrap();
}
public void Complete() => _actionBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
用法示例:
private ThrottledExecution<JObject> throttledExecution
= new ThrottledExecution<JObject>(concurrencyLevel: 10);
public Task<JObject> GetContent(string request)
{
return throttledExecution.Run(() => ProcessRequest(request));
}