MapReduce 使用 DataFlow 库

MapReduce using DataFlow library

我正在尝试使用 System.Threading.Tasks.Dataflow 实现经典的 map-reduce 问题,虽然我可以得到一些(某种)工作,但我正在努力了解如何概括此功能。

给出一个简单的问题

我遇到的问题是我可以使用 BufferBlock 使它工作,但我必须指定并行任务集的初始大小。这对于测试代码(下面)来说很好,因为我预先知道有多少项目要排队,但说我不知道​​......我该如何设置这个管道?

使用的测试代码(注意我在第一个“并行”块中添加了一个短暂的延迟只是为了查看一些处理时间差异,具体取决于并行度):

using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

var input = 10;

var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
    return Enumerable.Range(1, x).Select(x => x);
});

var squareBlock = new TransformBlock<int, int>(async x =>
 {
     await Task.Delay(100);
     return x * x;
 }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var addFiveBlock = new TransformBlock<int, int>(x =>
{
    return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var divTwoBlock = new TransformBlock<int, double>(x =>
{
    return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

var batchBlock = new BatchBlock<double>(input);

var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
    return x.Sum();
});

var options = new DataflowLinkOptions { PropagateCompletion = true };

fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);


var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();


var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");

await sumBlock.Completion;

一个想法是配置 BatchBlock<T> 最大 batchSize:

var batchBlock = new BatchBlock<double>(Int32.MaxValue);

batchBlock 完成时(当它的 Complete 方法被调用时),它将发出一个包含所有消息的批处理。缺点是通过缓冲每条消息,如果消息数量巨大,您可能 运行 内存不足。或者,如果消息的数量大于 Int32.MaxValue 并且奇迹般地你没有 运行 内存不足,你会得到不止一批,这与你试图实现的逻辑有关是个错误。

一个更好的主意是实现一个自定义数据流块,该块聚合它动态接收的消息。类似于 Aggregate LINQ 运算符的东西:

public static TResult Aggregate<TSource, TAccumulate, TResult>(
    this IEnumerable<TSource> source,
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector);

这是一个实现,由两个本地块组成,用 DataflowBlock.Encapsulate 方法封装:

public static IPropagatorBlock<TSource, TResult>
    CreateAggregateBlock<TSource, TAccumulate, TResult>(
    TAccumulate seed,
    Func<TAccumulate, TSource, TAccumulate> function,
    Func<TAccumulate, TResult> resultSelector,
    ExecutionDataflowBlockOptions options = default)
{
    options ??= new ExecutionDataflowBlockOptions();
    var maxDOP = options.MaxDegreeOfParallelism;
    options.MaxDegreeOfParallelism = 1;

    var inputBlock = new ActionBlock<TSource>(item =>
    {
        seed = function(seed, item);
    }, options);

    var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
    {
        return resultSelector(accumulate);
    }, options);

    options.MaxDegreeOfParallelism = maxDOP; // Restore initial value

    PropagateCompletion(inputBlock, outputBlock, () =>
    {
        outputBlock.Post(seed);
    });

    return DataflowBlock.Encapsulate(inputBlock, outputBlock);

    static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
        Action onSuccessfulCompletion)
    {
        ThreadPool.QueueUserWorkItem(async _ =>
        {
            try { await source.Completion; } catch { }
            Exception exception =
                source.Completion.IsFaulted ? source.Completion.Exception : null;
            if (source.Completion.IsCompletedSuccessfully)
            {
                // The action is invoked before completing the target.
                try { onSuccessfulCompletion(); }
                catch (Exception ex) { exception = ex; }
            }
            if (exception != null) target.Fault(exception); else target.Complete();
        });
    }
}

一个棘手的部分是如何将一个块的完成传播到另一个块。我的 preferred technique 是在线程池上调用一个 async void 方法。这样,我的代码中的任何错误都将作为崩溃的未处理异常暴露出来。另一种方法是将代码放在 fire-and-forget 任务延续中,在这种情况下,错误的影响很可能是静默死锁。

另一个问号是seed状态的突变是否对参与计算的所有线程可见。我避免设置明确的障碍或 lock,我依赖于 TPL 在任务排队时以及在任务执行时 beginning/end 包含的

用法示例:

var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
    (acc, x) => acc + x, acc => acc);