MapReduce 使用 DataFlow 库
MapReduce using DataFlow library
我正在尝试使用 System.Threading.Tasks.Dataflow
实现经典的 map-reduce 问题,虽然我可以得到一些(某种)工作,但我正在努力了解如何概括此功能。
给出一个简单的问题
- 产生一个整数流;并为每个数字并行
- 平方数
- 加 5
- 除以 2
- 取所有数字的总和
我遇到的问题是我可以使用 BufferBlock
使它工作,但我必须指定并行任务集的初始大小。这对于测试代码(下面)来说很好,因为我预先知道有多少项目要排队,但说我不知道......我该如何设置这个管道?
使用的测试代码(注意我在第一个“并行”块中添加了一个短暂的延迟只是为了查看一些处理时间差异,具体取决于并行度):
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
var input = 10;
var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
return Enumerable.Range(1, x).Select(x => x);
});
var squareBlock = new TransformBlock<int, int>(async x =>
{
await Task.Delay(100);
return x * x;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var addFiveBlock = new TransformBlock<int, int>(x =>
{
return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var divTwoBlock = new TransformBlock<int, double>(x =>
{
return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var batchBlock = new BatchBlock<double>(input);
var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
return x.Sum();
});
var options = new DataflowLinkOptions { PropagateCompletion = true };
fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);
var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();
var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");
await sumBlock.Completion;
一个想法是配置 BatchBlock<T>
最大 batchSize
:
var batchBlock = new BatchBlock<double>(Int32.MaxValue);
当 batchBlock
完成时(当它的 Complete
方法被调用时),它将发出一个包含所有消息的批处理。缺点是通过缓冲每条消息,如果消息数量巨大,您可能 运行 内存不足。或者,如果消息的数量大于 Int32.MaxValue
并且奇迹般地你没有 运行 内存不足,你会得到不止一批,这与你试图实现的逻辑有关是个错误。
一个更好的主意是实现一个自定义数据流块,该块聚合它动态接收的消息。类似于 Aggregate
LINQ 运算符的东西:
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IEnumerable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> function,
Func<TAccumulate, TResult> resultSelector);
这是一个实现,由两个本地块组成,用 DataflowBlock.Encapsulate
方法封装:
public static IPropagatorBlock<TSource, TResult>
CreateAggregateBlock<TSource, TAccumulate, TResult>(
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> function,
Func<TAccumulate, TResult> resultSelector,
ExecutionDataflowBlockOptions options = default)
{
options ??= new ExecutionDataflowBlockOptions();
var maxDOP = options.MaxDegreeOfParallelism;
options.MaxDegreeOfParallelism = 1;
var inputBlock = new ActionBlock<TSource>(item =>
{
seed = function(seed, item);
}, options);
var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
{
return resultSelector(accumulate);
}, options);
options.MaxDegreeOfParallelism = maxDOP; // Restore initial value
PropagateCompletion(inputBlock, outputBlock, () =>
{
outputBlock.Post(seed);
});
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
Action onSuccessfulCompletion)
{
ThreadPool.QueueUserWorkItem(async _ =>
{
try { await source.Completion; } catch { }
Exception exception =
source.Completion.IsFaulted ? source.Completion.Exception : null;
if (source.Completion.IsCompletedSuccessfully)
{
// The action is invoked before completing the target.
try { onSuccessfulCompletion(); }
catch (Exception ex) { exception = ex; }
}
if (exception != null) target.Fault(exception); else target.Complete();
});
}
}
一个棘手的部分是如何将一个块的完成传播到另一个块。我的 preferred technique 是在线程池上调用一个 async void
方法。这样,我的代码中的任何错误都将作为崩溃的未处理异常暴露出来。另一种方法是将代码放在 fire-and-forget 任务延续中,在这种情况下,错误的影响很可能是静默死锁。
另一个问号是seed
状态的突变是否对参与计算的所有线程可见。我避免设置明确的障碍或 lock
,我依赖于 TPL 在任务排队时以及在任务执行时 beginning/end 包含的 。
用法示例:
var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
(acc, x) => acc + x, acc => acc);
我正在尝试使用 System.Threading.Tasks.Dataflow
实现经典的 map-reduce 问题,虽然我可以得到一些(某种)工作,但我正在努力了解如何概括此功能。
给出一个简单的问题
- 产生一个整数流;并为每个数字并行
- 平方数
- 加 5
- 除以 2
- 取所有数字的总和
我遇到的问题是我可以使用 BufferBlock
使它工作,但我必须指定并行任务集的初始大小。这对于测试代码(下面)来说很好,因为我预先知道有多少项目要排队,但说我不知道......我该如何设置这个管道?
使用的测试代码(注意我在第一个“并行”块中添加了一个短暂的延迟只是为了查看一些处理时间差异,具体取决于并行度):
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
var input = 10;
var fanOutBlock = new TransformManyBlock<int, int>(x =>
{
return Enumerable.Range(1, x).Select(x => x);
});
var squareBlock = new TransformBlock<int, int>(async x =>
{
await Task.Delay(100);
return x * x;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var addFiveBlock = new TransformBlock<int, int>(x =>
{
return x + 5;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var divTwoBlock = new TransformBlock<int, double>(x =>
{
return x/2.0;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
var batchBlock = new BatchBlock<double>(input);
var sumBlock = new TransformBlock<IList<double>,double>(x =>
{
return x.Sum();
});
var options = new DataflowLinkOptions { PropagateCompletion = true };
fanOutBlock.LinkTo(squareBlock, options);
squareBlock.LinkTo(addFiveBlock, options);
addFiveBlock.LinkTo(divTwoBlock, options);
divTwoBlock.LinkTo(batchBlock, options);
batchBlock.LinkTo(sumBlock, options);
var sw = Stopwatch.StartNew();
fanOutBlock.Post(input);
fanOutBlock.Complete();
var result = sumBlock.Receive();
Console.WriteLine(result);
sw.Stop();
Console.WriteLine($"{sw.ElapsedMilliseconds}ms");
await sumBlock.Completion;
一个想法是配置 BatchBlock<T>
最大 batchSize
:
var batchBlock = new BatchBlock<double>(Int32.MaxValue);
当 batchBlock
完成时(当它的 Complete
方法被调用时),它将发出一个包含所有消息的批处理。缺点是通过缓冲每条消息,如果消息数量巨大,您可能 运行 内存不足。或者,如果消息的数量大于 Int32.MaxValue
并且奇迹般地你没有 运行 内存不足,你会得到不止一批,这与你试图实现的逻辑有关是个错误。
一个更好的主意是实现一个自定义数据流块,该块聚合它动态接收的消息。类似于 Aggregate
LINQ 运算符的东西:
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IEnumerable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> function,
Func<TAccumulate, TResult> resultSelector);
这是一个实现,由两个本地块组成,用 DataflowBlock.Encapsulate
方法封装:
public static IPropagatorBlock<TSource, TResult>
CreateAggregateBlock<TSource, TAccumulate, TResult>(
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> function,
Func<TAccumulate, TResult> resultSelector,
ExecutionDataflowBlockOptions options = default)
{
options ??= new ExecutionDataflowBlockOptions();
var maxDOP = options.MaxDegreeOfParallelism;
options.MaxDegreeOfParallelism = 1;
var inputBlock = new ActionBlock<TSource>(item =>
{
seed = function(seed, item);
}, options);
var outputBlock = new TransformBlock<TAccumulate, TResult>(accumulate =>
{
return resultSelector(accumulate);
}, options);
options.MaxDegreeOfParallelism = maxDOP; // Restore initial value
PropagateCompletion(inputBlock, outputBlock, () =>
{
outputBlock.Post(seed);
});
return DataflowBlock.Encapsulate(inputBlock, outputBlock);
static void PropagateCompletion(IDataflowBlock source, IDataflowBlock target,
Action onSuccessfulCompletion)
{
ThreadPool.QueueUserWorkItem(async _ =>
{
try { await source.Completion; } catch { }
Exception exception =
source.Completion.IsFaulted ? source.Completion.Exception : null;
if (source.Completion.IsCompletedSuccessfully)
{
// The action is invoked before completing the target.
try { onSuccessfulCompletion(); }
catch (Exception ex) { exception = ex; }
}
if (exception != null) target.Fault(exception); else target.Complete();
});
}
}
一个棘手的部分是如何将一个块的完成传播到另一个块。我的 preferred technique 是在线程池上调用一个 async void
方法。这样,我的代码中的任何错误都将作为崩溃的未处理异常暴露出来。另一种方法是将代码放在 fire-and-forget 任务延续中,在这种情况下,错误的影响很可能是静默死锁。
另一个问号是seed
状态的突变是否对参与计算的所有线程可见。我避免设置明确的障碍或 lock
,我依赖于 TPL 在任务排队时以及在任务执行时 beginning/end 包含的
用法示例:
var sumBlock = CreateAggregateBlock<double, double, double>(0.0,
(acc, x) => acc + x, acc => acc);