通过 IEnumerable 和 TPL 数据流流式传输数据

Streaming data via IEnumerable & TPL Dataflow

我正在从上游 API 获取项目,这非常慢。我尝试通过使用 TPL 数据流创建多个连接并将它们组合在一起来加快速度,就像这样;

class Stuff
{
    int Id { get; }
}

async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

async Task<IEnumerable<Stuff>> GetLotsOfStuff(IEnumerable<int> ids)
{
    var bagOfStuff = new ConcurrentBag<Stuff>();

    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        bagOfStuff.Add(await GetStuffById(id));
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    await processor.Completion;

    return bagOfStuff.ToArray();
}

问题是我必须等到我查询完 Stuff 的整个集合后才能 return 给调用者。我更喜欢的是,每当多个并行查询中的任何一个 return 是一个项目时,我都会以 yield return 的方式 return 该项目。因此我不需要 return 和 sync Task<IEnumerable<Stuff>>,我可以 return 和 IEnumerable<Stuff> 并且调用者在任何项目 return 后立即推进迭代。

我试过这样做;

IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5
    };

    var processor = new ActionBlock<int>(async id =>
    {
        yield return await GetStuffById(id);
    }, options);

    foreach (int id in ids)
    {
        processor.Post(id);
    }

    processor.Complete();
    processor.Completion.Wait();

    yield break;
}

但是我得到一个错误

The yield statement cannot be used inside an anonymous method or lambda expression

如何重组我的代码?

根据您的具体用例,您可能有几种不同的处理方法。但是要根据 TPL-Dataflow 处理项目,您需要将源块更改为 TransformBlock<,> 并将项目流到另一个块以处理您的项目。请注意,现在您可以摆脱收集 ConcurrentBag 并且如果您不关心您收到物品的顺序,请务必将 EnsureOrdered 设置为 false。此外 link块并传播完成,以确保您的管道在检索到所有项目并随后进行处理后完成。

class Stuff
{
    int Id { get; }
}

public class GetStuff
{
    async Task<Stuff> GetStuffById(int id) => throw new NotImplementedException();

    async Task GetLotsOfStuff(IEnumerable<int> ids)
    {
        //var bagOfStuff = new ConcurrentBag<Stuff>();

        var options = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 5,
            EnsureOrdered = false
        };

        var processor = new TransformBlock<int, Stuff>(id => GetStuffById(id), options);

        var handler = new ActionBlock<Stuff>(s => throw new NotImplementedException());

        processor.LinkTo(handler, new DataflowLinkOptions() { PropagateCompletion = true });

        foreach (int id in ids)
        {
            processor.Post(id);
        }

        processor.Complete();
        await handler.Completion;
    }
}

其他选项可以使您的方法成为可观察的流出 TransformBlock 或使用 IAsyncEnumerableyield return 和异步获取方法。

您可以 return 一个 IEnumerable,但要这样做您必须阻塞当前线程。您需要一个 TransformBlock 来处理 id,以及一个将异步提供带有 id 的 TransformBlock 的馈送任务。最后当前线程会进入一个阻塞循环,等待produced stuff yield:

static IEnumerable<Stuff> GetLotsOfStuff(IEnumerable<int> ids)
{
    using var completionCTS = new CancellationTokenSource();

    var processor = new TransformBlock<int, Stuff>(async id =>
    {
        return await GetStuffById(id);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = 5,
        BoundedCapacity = 50, // Avoid buffering millions of ids
        CancellationToken = completionCTS.Token
    });

    var feederTask = Task.Run(async () =>
    {
        try
        {
            foreach (int id in ids)
                if (!await processor.SendAsync(id)) break;
        }
        finally { processor.Complete(); }
    });

    try
    {
        while (processor.OutputAvailableAsync().Result)
            while (processor.TryReceive(out var stuff))
                yield return stuff;
    }
    finally // This runs when the caller exits the foreach loop
    {
        completionCTS.Cancel(); // Cancel the TransformBlock if it's still running
    }

    Task.WaitAll(feederTask, processor.Completion); // Propagate all exceptions
}

不需要ConcurrentBag,因为TransformBlock有一个内部输出缓冲区。棘手的部分是处理调用者将通过提前中断或被异常阻止而放弃 IEnumerable<Stuff> 的枚举的情况。在这种情况下,您不希望 feeder-task 一直使用 id 发送 IEnumerable<int> 直到结束。还好there is a solution。将 yielding 循环包含在 try/finally 块中允许接收此事件的通知,以便可以及时终止 feeder-task。

另一种实现可以通过在单个循环中组合抽取 id、提供块和生成内容来消除对 feeder-task 的需求。在这种情况下,您会希望泵送和产量之间存在滞后。要实现它,MoreLinq's Lag (or Lead) 扩展方法可能会很方便。


更新: 这是一个不同的实现,它在同一个循环中枚举和产生。为了实现所需的滞后,可枚举源右填充一些虚拟元素,数量与并发度相等。

此实现接受泛型,而不是 intStuff

public static IEnumerable<TResult> Transform<TSource, TResult>(
    IEnumerable<TSource> source, Func<TSource, Task<TResult>> taskFactory,
    int degreeOfConcurrency)
{
    var processor = new TransformBlock<TSource, TResult>(async item =>
    {
        return await taskFactory(item);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = degreeOfConcurrency
    });

    var paddedSource = source.Select(item => (item, true))
        .Concat(Enumerable.Repeat((default(TSource), false), degreeOfConcurrency));
    int index = -1;
    bool completed = false;
    foreach (var (item, hasValue) in paddedSource)
    {
        index++;
        if (hasValue) { processor.Post(item); }
        else if (!completed) { processor.Complete(); completed = true; }
        if (index >= degreeOfConcurrency)
        {
            if (!processor.OutputAvailableAsync().Result) break; // Blocking call
            if (!processor.TryReceive(out var result))
                throw new InvalidOperationException(); // Should never happen
            yield return result;
        }
    }
    processor.Completion.Wait();
}

用法示例:

IEnumerable<Stuff> lotsOfStuff = Transform(ids, GetStuffById, 5);

两种实现都可以简单地修改为 return 和 IAsyncEnumerable 而不是 IEnumerable,以避免阻塞调用线程。