如何使用 asp.net web API 2(使用 TPL/Reactive)缓冲和批处理 REST get 查询?
How to buffer and batch REST get queries with asp.net web API 2 (using TPL/Reactive)?
我正在运行 public 正面 api 目前很难扩展。此 API 建立在 Asp.Net web API 2 (.Net 4.7.2) 之上。这个
我遇到的问题是,从这个入口点开始,服务必须多次调用其他内部服务以暴露其 Rest 接口。
我们做了一些优化:
- 使所有这些调用异步 (async/await)。
- 所有服务(public 面对一个以及内部)都是负载平衡的,我们已经部署了 4 台具有高内存的服务器 ram/cpu(64 GB,每台 8cpu)
但是当我们突然爆发负载时,或者当我们进行一些压力测试时,我们发现我们很难扩展:响应时间开始增加并且我们无法达到超过 150 req/s
平均响应时间为 2.5 秒,而所有时间似乎都花在等待每个内部服务响应的网络延迟上……
所以我在想是否有可能缓冲一堆请求并批量调用内部 apis 以获取要组合的详细信息然后回答调用者。
我的想法是拥有一种特殊类型的静态 httpClient,它带有一个异步方法,可以对调用进行缓冲,并在有特殊数量的缓冲调用时或当
几毫秒的限制已经过去:这样当我们负载不足时,我们的 API 可以进行很少的网络调用并且响应更快......
我知道有些人也使用 mom/bus 作为 Kafka 的例子,但在我看来,这样做只会让我们有更多的并行调用来处理,但没有真正的收获
关于速度..(我可能错了)
为了说明我的想法:
您认为这可以使用 Reactive(观察延迟时间或缓冲的消息数)/TPL Dataflow(以填充块然后进行批处理调用)来完成吗?
我有这个想法,但我不知道这是否是个好主意,以及如何让它发挥作用...
更新:
在这里找到 Theodor Zoulias 提供的有用示例代码:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static class Program
{
public static async Task Main()
{
var execution = new BatchExecution<int, int>(async (int[] inputs) =>
{
Print($"Processing [{String.Join(", ", inputs)}]");
await Task.Yield() ;
return inputs.Select(x => x * 10).ToArray();
}, batchSize: 3);
Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
{
//await Task.Delay(id * 50);
Print($"Before InvokeAsync({id})");
var result = await execution.InvokeAsync(id);
Print($"After await InvokeAsync({id}), result is {result}");
})).ToArray();
await Task.WhenAll(workers);
}
static void Print(string line)
{
Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
.ManagedThreadId}] > {line}");
}
public class BatchExecution<TInput, TOutput>
{
private class AsyncOperation : TaskCompletionSource<TOutput>
{
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously)
{ }
public TInput Input { get; init; }
}
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
{
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
{
try
{
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
}
catch (OperationCanceledException oce)
{
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
}
catch (Exception ex)
{
Array.ForEach(operations, x => x.TrySetException(ex));
}
}, new() { MaxDegreeOfParallelism = maximumConcurrency });
_batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
}
public Task<TOutput> InvokeAsync(TInput input)
{
var operation = new AsyncOperation() { Input = input };
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
}
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
}
我在做这件事的过程中需要一些 feedback/advice :是否可以用 Reactive/TPL 和 httpClient 做我想做的事情,或者有更好的方法吗?
这里有一个BatchExecution
class,它接受单个请求,并在存储的请求数达到指定数量(batchSize
)时调用批处理操作。批处理操作的结果传播到关联的单个请求:
public class BatchExecution<TInput, TOutput>
{
private class AsyncOperation : TaskCompletionSource<TOutput>
{
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously) { }
public TInput Input { get; init; }
}
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
{
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
{
try
{
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
}
catch (OperationCanceledException oce)
{
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
}
catch (Exception ex)
{
Array.ForEach(operations, x => x.TrySetException(ex));
}
}, new() { MaxDegreeOfParallelism = maximumConcurrency });
_batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
}
public Task<TOutput> InvokeAsync(TInput input)
{
var operation = new AsyncOperation() { Input = input };
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
}
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
使用示例。让我们假设这个内部服务存在 API:
Task<string[]> GetCityNamesAsync(int[] ids);
然后 BatchExecution
可以像这样初始化和使用:
var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
{
return await GetCityNamesAsync(ids);
}, batchSize: 10);
//...
string cityName = await batchExecution.InvokeAsync(13);
您可以考虑通过替换标准 BatchBlock<AsyncOperation>
with a custom time-aware BatchBlock
, like the one found in this 问题来自定义 class。
我正在运行 public 正面 api 目前很难扩展。此 API 建立在 Asp.Net web API 2 (.Net 4.7.2) 之上。这个
我遇到的问题是,从这个入口点开始,服务必须多次调用其他内部服务以暴露其 Rest 接口。 我们做了一些优化:
- 使所有这些调用异步 (async/await)。
- 所有服务(public 面对一个以及内部)都是负载平衡的,我们已经部署了 4 台具有高内存的服务器 ram/cpu(64 GB,每台 8cpu)
但是当我们突然爆发负载时,或者当我们进行一些压力测试时,我们发现我们很难扩展:响应时间开始增加并且我们无法达到超过 150 req/s 平均响应时间为 2.5 秒,而所有时间似乎都花在等待每个内部服务响应的网络延迟上…… 所以我在想是否有可能缓冲一堆请求并批量调用内部 apis 以获取要组合的详细信息然后回答调用者。
我的想法是拥有一种特殊类型的静态 httpClient,它带有一个异步方法,可以对调用进行缓冲,并在有特殊数量的缓冲调用时或当 几毫秒的限制已经过去:这样当我们负载不足时,我们的 API 可以进行很少的网络调用并且响应更快...... 我知道有些人也使用 mom/bus 作为 Kafka 的例子,但在我看来,这样做只会让我们有更多的并行调用来处理,但没有真正的收获 关于速度..(我可能错了)
为了说明我的想法:
您认为这可以使用 Reactive(观察延迟时间或缓冲的消息数)/TPL Dataflow(以填充块然后进行批处理调用)来完成吗? 我有这个想法,但我不知道这是否是个好主意,以及如何让它发挥作用...
更新: 在这里找到 Theodor Zoulias 提供的有用示例代码:
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
static class Program
{
public static async Task Main()
{
var execution = new BatchExecution<int, int>(async (int[] inputs) =>
{
Print($"Processing [{String.Join(", ", inputs)}]");
await Task.Yield() ;
return inputs.Select(x => x * 10).ToArray();
}, batchSize: 3);
Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
{
//await Task.Delay(id * 50);
Print($"Before InvokeAsync({id})");
var result = await execution.InvokeAsync(id);
Print($"After await InvokeAsync({id}), result is {result}");
})).ToArray();
await Task.WhenAll(workers);
}
static void Print(string line)
{
Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
.ManagedThreadId}] > {line}");
}
public class BatchExecution<TInput, TOutput>
{
private class AsyncOperation : TaskCompletionSource<TOutput>
{
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously)
{ }
public TInput Input { get; init; }
}
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
{
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
{
try
{
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
}
catch (OperationCanceledException oce)
{
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
}
catch (Exception ex)
{
Array.ForEach(operations, x => x.TrySetException(ex));
}
}, new() { MaxDegreeOfParallelism = maximumConcurrency });
_batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
}
public Task<TOutput> InvokeAsync(TInput input)
{
var operation = new AsyncOperation() { Input = input };
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
}
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
}
我在做这件事的过程中需要一些 feedback/advice :是否可以用 Reactive/TPL 和 httpClient 做我想做的事情,或者有更好的方法吗?
这里有一个BatchExecution
class,它接受单个请求,并在存储的请求数达到指定数量(batchSize
)时调用批处理操作。批处理操作的结果传播到关联的单个请求:
public class BatchExecution<TInput, TOutput>
{
private class AsyncOperation : TaskCompletionSource<TOutput>
{
public AsyncOperation() :
base(TaskCreationOptions.RunContinuationsAsynchronously) { }
public TInput Input { get; init; }
}
private readonly BatchBlock<AsyncOperation> _batchBlock;
private readonly ActionBlock<AsyncOperation[]> _actionBlock;
public BatchExecution(
Func<TInput[], Task<TOutput[]>> batchAction,
int batchSize,
int maximumConcurrency = DataflowBlockOptions.Unbounded)
{
_batchBlock = new BatchBlock<AsyncOperation>(batchSize);
_actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
{
try
{
TInput[] inputs = operations.Select(x => x.Input).ToArray();
TOutput[] results = await batchAction(inputs);
if (results.Length != inputs.Length)
throw new InvalidOperationException("Results count mismatch.");
for (int i = 0; i < operations.Length; i++)
operations[i].SetResult(results[i]);
}
catch (OperationCanceledException oce)
{
Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
}
catch (Exception ex)
{
Array.ForEach(operations, x => x.TrySetException(ex));
}
}, new() { MaxDegreeOfParallelism = maximumConcurrency });
_batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });
}
public Task<TOutput> InvokeAsync(TInput input)
{
var operation = new AsyncOperation() { Input = input };
bool accepted = _batchBlock.Post(operation);
if (!accepted) throw new InvalidOperationException(
"The component has been marked as complete.");
return operation.Task;
}
public void Complete() => _batchBlock.Complete();
public Task Completion => _actionBlock.Completion;
}
使用示例。让我们假设这个内部服务存在 API:
Task<string[]> GetCityNamesAsync(int[] ids);
然后 BatchExecution
可以像这样初始化和使用:
var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
{
return await GetCityNamesAsync(ids);
}, batchSize: 10);
//...
string cityName = await batchExecution.InvokeAsync(13);
您可以考虑通过替换标准 BatchBlock<AsyncOperation>
with a custom time-aware BatchBlock
, like the one found in this 问题来自定义 class。