如何使用 asp.net web API 2(使用 TPL/Reactive)缓冲和批处理 REST get 查询?

How to buffer and batch REST get queries with asp.net web API 2 (using TPL/Reactive)?

我正在运行 public 正面 api 目前很难扩展。此 API 建立在 Asp.Net web API 2 (.Net 4.7.2) 之上。这个

我遇到的问题是,从这个入口点开始,服务必须多次调用其他内部服务以暴露其 Rest 接口。 我们做了一些优化:

但是当我们突然爆发负载时,或者当我们进行一些压力测试时,我们发现我们很难扩展:响应时间开始增加并且我们无法达到超过 150 req/s 平均响应时间为 2.5 秒,而所有时间似乎都花在等待每个内部服务响应的网络延迟上…… 所以我在想是否有可能缓冲一堆请求并批量调用内部 apis 以获取要组合的详细信息然后回答调用者。

我的想法是拥有一种特殊类型的静态 httpClient,它带有一个异步方法,可以对调用进行缓冲,并在有特殊数量的缓冲调用时或当 几毫秒的限制已经过去:这样当我们负载不足时,我们的 API 可以进行很少的网络调用并且响应更快...... 我知道有些人也使用 mom/bus 作为 Kafka 的例子,但在我看来,这样做只会让我们有更多的并行调用来处理,但没有真正的收获 关于速度..(我可能错了)


您认为这可以使用 Reactive(观察延迟时间或缓冲的消息数)/TPL Dataflow(以填充块然后进行批处理调用)来完成吗? 我有这个想法,但我不知道这是否是个好主意,以及如何让它发挥作用...

更新: 在这里找到 Theodor Zoulias 提供的有用示例代码:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static class Program
    public static async Task Main()
        var execution = new BatchExecution<int, int>(async (int[] inputs) =>
            Print($"Processing [{String.Join(", ", inputs)}]");
            await Task.Yield() ;
            return inputs.Select(x => x * 10).ToArray();
        }, batchSize: 3);

        Task[] workers = Enumerable.Range(1, 10).Select(id => Task.Run(async () =>
            //await Task.Delay(id * 50);
            Print($"Before InvokeAsync({id})");
            var result = await execution.InvokeAsync(id);
            Print($"After await InvokeAsync({id}), result is {result}");

        await Task.WhenAll(workers);

    static void Print(string line)
        Console.WriteLine($@"{DateTime.Now:HH:mm:ss.fff} [{Thread.CurrentThread
            .ManagedThreadId}] > {line}");

    public class BatchExecution<TInput, TOutput>
        private class AsyncOperation : TaskCompletionSource<TOutput>
            public AsyncOperation() :
            { }
            public TInput Input { get; init; }

        private readonly BatchBlock<AsyncOperation> _batchBlock;
        private readonly ActionBlock<AsyncOperation[]> _actionBlock;

        public BatchExecution(
            Func<TInput[], Task<TOutput[]>> batchAction,
            int batchSize,
            int maximumConcurrency = DataflowBlockOptions.Unbounded)
            _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
            _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
                    TInput[] inputs = operations.Select(x => x.Input).ToArray();
                    TOutput[] results = await batchAction(inputs);
                    if (results.Length != inputs.Length)
                        throw new InvalidOperationException("Results count mismatch.");
                    for (int i = 0; i < operations.Length; i++)
                catch (OperationCanceledException oce)
                    Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
                catch (Exception ex)
                    Array.ForEach(operations, x => x.TrySetException(ex));
            }, new() { MaxDegreeOfParallelism = maximumConcurrency });
            _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });

        public Task<TOutput> InvokeAsync(TInput input)
            var operation = new AsyncOperation() { Input = input };
            bool accepted = _batchBlock.Post(operation);
            if (!accepted) throw new InvalidOperationException(
                "The component has been marked as complete.");
            return operation.Task;

        public void Complete() => _batchBlock.Complete();
        public Task Completion => _actionBlock.Completion;

我在做这件事的过程中需要一些 feedback/advice :是否可以用 Reactive/TPL 和 httpClient 做我想做的事情,或者有更好的方法吗?


public class BatchExecution<TInput, TOutput>
    private class AsyncOperation : TaskCompletionSource<TOutput>
        public AsyncOperation() :
            base(TaskCreationOptions.RunContinuationsAsynchronously) { }
        public TInput Input { get; init; }

    private readonly BatchBlock<AsyncOperation> _batchBlock;
    private readonly ActionBlock<AsyncOperation[]> _actionBlock;

    public BatchExecution(
        Func<TInput[], Task<TOutput[]>> batchAction,
        int batchSize,
        int maximumConcurrency = DataflowBlockOptions.Unbounded)
        _batchBlock = new BatchBlock<AsyncOperation>(batchSize);
        _actionBlock = new ActionBlock<AsyncOperation[]>(async operations =>
                TInput[] inputs = operations.Select(x => x.Input).ToArray();
                TOutput[] results = await batchAction(inputs);
                if (results.Length != inputs.Length)
                    throw new InvalidOperationException("Results count mismatch.");
                for (int i = 0; i < operations.Length; i++)
            catch (OperationCanceledException oce)
                Array.ForEach(operations, x => x.TrySetCanceled(oce.CancellationToken));
            catch (Exception ex)
                Array.ForEach(operations, x => x.TrySetException(ex));
        }, new() { MaxDegreeOfParallelism = maximumConcurrency });
        _batchBlock.LinkTo(_actionBlock, new() { PropagateCompletion = true });

    public Task<TOutput> InvokeAsync(TInput input)
        var operation = new AsyncOperation() { Input = input };
        bool accepted = _batchBlock.Post(operation);
        if (!accepted) throw new InvalidOperationException(
            "The component has been marked as complete.");
        return operation.Task;

    public void Complete() => _batchBlock.Complete();
    public Task Completion => _actionBlock.Completion;

使用示例。让我们假设这个内部服务存在 API:

Task<string[]> GetCityNamesAsync(int[] ids);

然后 BatchExecution 可以像这样初始化和使用:

var batchExecution = new BatchExecution<int, string>(async (int[] ids) =>
    return await GetCityNamesAsync(ids);
}, batchSize: 10);

string cityName = await batchExecution.InvokeAsync(13);

您可以考虑通过替换标准 BatchBlock<AsyncOperation> with a custom time-aware BatchBlock, like the one found in this 问题来自定义 class。