如何更改现有 TPL 块的 MaxDegreeOfParallelism?
How to change the MaxDegreeOfParallelism of an existing TPL block?
我已经使用 TPL 数据流创建了一个小型管道。它由链接到 ActionBlock
的 TransformBlock
组成。这是设置的样子:
var transformBlock = new TransformBlock<int, string>(async num=>
{
// Do stuff.
return num.ToString();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var actionBlock = new ActionBlock<string>(text=>
{
Console.WriteLine(text);
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
我想更改变换块实例的 MaxDegreeOfParallelism
。我还没有找到这样做的方法,因为没有 public 属性 允许我更改它。有办法吗?
根据 MSDN:
Dataflow block capture the state of the options at their construction. Subsequent changes to the provided ExecutionDataflowBlockOptions instance should not affect the behavior of a dataflow block.
所以,回答你的问题,
I would like to alter the MaxDegreeOfParallelism
of the transform block instance.
它不是为了让现有的块改变它的选项。您应该创建新的或创建新的管道,或者从一开始就调整此设置以获得最大利润。
我试图通过使用夹在两个 BufferBlock
之间的可互换 TransformBlock
来解决这个问题。每次 MaxDegreeOfParallelism
改变时,旧的 TransformBlock
被分离,标记为完成,等待它完成,然后一个新的 TransformBlock
被附加到它的位置。此解决方案有两个缺点:
在更换中间块的间歇阶段,并行度逐渐降为零。
中间块的 BoundedCapacity
必须设置为等于 MaxDegreeOfParallelism
,这可能会影响非常精细的工作负载的性能。
这里是 CreateVariableDopPropagator
方法,它的同伴 VariableDopExecutionDataflowBlockOptions
class:
public class VariableDopExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
private readonly object _locker = new object();
public event EventHandler MaxDegreeOfParallelismChanged;
/// <summary>Gets the maximum number of messages that may be processed by the
/// block concurrently.</summary>
public new int MaxDegreeOfParallelism
{
get { lock (_locker) return base.MaxDegreeOfParallelism; }
set
{
bool changed;
lock (_locker)
{
changed = value != base.MaxDegreeOfParallelism;
base.MaxDegreeOfParallelism = value;
}
if (changed) MaxDegreeOfParallelismChanged?.Invoke(this, EventArgs.Empty);
}
}
}
public static IPropagatorBlock<TInput, TOutput>
CreateVariableDopPropagator<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
var optionsCopy = new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxDegreeOfParallelism = dataflowBlockOptions.MaxDegreeOfParallelism,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
SingleProducerConstrained = dataflowBlockOptions.SingleProducerConstrained,
TaskScheduler = dataflowBlockOptions.TaskScheduler,
};
var locker = new object();
var input = new BufferBlock<TInput>(optionsCopy);
var output = new BufferBlock<TOutput>(optionsCopy);
PropagateFailure(output, input);
var propagateCompletion = new DataflowLinkOptions() { PropagateCompletion = true };
TransformBlock<TInput, TOutput> middle;
IDisposable link1 = null;
IDisposable link2 = null;
CreateMiddleBlock();
dataflowBlockOptions.MaxDegreeOfParallelismChanged += OnMaxDopChanged;
OnCompletion(output, () =>
{
dataflowBlockOptions.MaxDegreeOfParallelismChanged -= OnMaxDopChanged;
}, null);
return DataflowBlock.Encapsulate(input, output);
void CreateMiddleBlock()
{
IDataflowBlock localMiddle;
lock (locker)
{
link1?.Dispose();
link2?.Dispose();
var maxDop = dataflowBlockOptions.MaxDegreeOfParallelism;
optionsCopy.MaxDegreeOfParallelism = maxDop;
optionsCopy.BoundedCapacity = maxDop;
middle = new TransformBlock<TInput, TOutput>(transform, optionsCopy);
link1 = input.LinkTo(middle, propagateCompletion);
link2 = middle.LinkTo(output, propagateCompletion);
localMiddle = middle;
}
PropagateFailure(localMiddle, input); // Non disposable link, but it doesn't
// matter because the completion of the middle must be awaited anyway.
}
void OnMaxDopChanged(object sender, EventArgs e)
{
// Detach the middle block if it's not already detached
IDataflowBlock localMiddle;
lock (locker)
{
if (link1 == null) return;
link1.Dispose();
link2.Dispose();
link1 = null;
link2 = middle.LinkTo(output); // Without completion propagation
localMiddle = middle;
}
localMiddle.Complete();
OnCompletion(localMiddle, () => CreateMiddleBlock(), output);
}
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex) { if (!block1.Completion.IsCanceled) block2.Fault(ex); }
}
async void OnCompletion(IDataflowBlock block, Action action, IDataflowBlock onError)
{
try { await block.Completion.ConfigureAwait(false); }
catch { } // Ignore
finally
{
try { action(); }
catch (Exception ex) { if (onError != null) onError.Fault(ex); else throw; }
}
}
}
// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
CreateVariableDopPropagator<TInput, TOutput>(
Func<TInput, TOutput> transform,
VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
return CreateVariableDopPropagator<TInput, TOutput>(
item => Task.FromResult(transform(item)), dataflowBlockOptions);
}
用法示例:
private VariableDopExecutionDataflowBlockOptions _options;
private IPropagatorBlock<string, bool> _urlChecker;
private HttpClient _client = new HttpClient();
public Form1()
{
InitializeComponent();
_options = new VariableDopExecutionDataflowBlockOptions();
_urlChecker = CreateVariableDopPropagator<string, bool>(async (url) =>
{
return (await _client.GetAsync(url)).IsSuccessStatusCode;
}, _options);
}
private void ListBox1_SelectedValueChanged(object sender, EventArgs e)
{
_options.MaxDegreeOfParallelism = (int)ListBox1.SelectedValue;
}
我已经使用 TPL 数据流创建了一个小型管道。它由链接到 ActionBlock
的 TransformBlock
组成。这是设置的样子:
var transformBlock = new TransformBlock<int, string>(async num=>
{
// Do stuff.
return num.ToString();
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
var actionBlock = new ActionBlock<string>(text=>
{
Console.WriteLine(text);
});
transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
我想更改变换块实例的 MaxDegreeOfParallelism
。我还没有找到这样做的方法,因为没有 public 属性 允许我更改它。有办法吗?
根据 MSDN:
Dataflow block capture the state of the options at their construction. Subsequent changes to the provided ExecutionDataflowBlockOptions instance should not affect the behavior of a dataflow block.
所以,回答你的问题,
I would like to alter the
MaxDegreeOfParallelism
of the transform block instance.
它不是为了让现有的块改变它的选项。您应该创建新的或创建新的管道,或者从一开始就调整此设置以获得最大利润。
我试图通过使用夹在两个 BufferBlock
之间的可互换 TransformBlock
来解决这个问题。每次 MaxDegreeOfParallelism
改变时,旧的 TransformBlock
被分离,标记为完成,等待它完成,然后一个新的 TransformBlock
被附加到它的位置。此解决方案有两个缺点:
在更换中间块的间歇阶段,并行度逐渐降为零。
中间块的
BoundedCapacity
必须设置为等于MaxDegreeOfParallelism
,这可能会影响非常精细的工作负载的性能。
这里是 CreateVariableDopPropagator
方法,它的同伴 VariableDopExecutionDataflowBlockOptions
class:
public class VariableDopExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
private readonly object _locker = new object();
public event EventHandler MaxDegreeOfParallelismChanged;
/// <summary>Gets the maximum number of messages that may be processed by the
/// block concurrently.</summary>
public new int MaxDegreeOfParallelism
{
get { lock (_locker) return base.MaxDegreeOfParallelism; }
set
{
bool changed;
lock (_locker)
{
changed = value != base.MaxDegreeOfParallelism;
base.MaxDegreeOfParallelism = value;
}
if (changed) MaxDegreeOfParallelismChanged?.Invoke(this, EventArgs.Empty);
}
}
}
public static IPropagatorBlock<TInput, TOutput>
CreateVariableDopPropagator<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
var optionsCopy = new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxDegreeOfParallelism = dataflowBlockOptions.MaxDegreeOfParallelism,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
SingleProducerConstrained = dataflowBlockOptions.SingleProducerConstrained,
TaskScheduler = dataflowBlockOptions.TaskScheduler,
};
var locker = new object();
var input = new BufferBlock<TInput>(optionsCopy);
var output = new BufferBlock<TOutput>(optionsCopy);
PropagateFailure(output, input);
var propagateCompletion = new DataflowLinkOptions() { PropagateCompletion = true };
TransformBlock<TInput, TOutput> middle;
IDisposable link1 = null;
IDisposable link2 = null;
CreateMiddleBlock();
dataflowBlockOptions.MaxDegreeOfParallelismChanged += OnMaxDopChanged;
OnCompletion(output, () =>
{
dataflowBlockOptions.MaxDegreeOfParallelismChanged -= OnMaxDopChanged;
}, null);
return DataflowBlock.Encapsulate(input, output);
void CreateMiddleBlock()
{
IDataflowBlock localMiddle;
lock (locker)
{
link1?.Dispose();
link2?.Dispose();
var maxDop = dataflowBlockOptions.MaxDegreeOfParallelism;
optionsCopy.MaxDegreeOfParallelism = maxDop;
optionsCopy.BoundedCapacity = maxDop;
middle = new TransformBlock<TInput, TOutput>(transform, optionsCopy);
link1 = input.LinkTo(middle, propagateCompletion);
link2 = middle.LinkTo(output, propagateCompletion);
localMiddle = middle;
}
PropagateFailure(localMiddle, input); // Non disposable link, but it doesn't
// matter because the completion of the middle must be awaited anyway.
}
void OnMaxDopChanged(object sender, EventArgs e)
{
// Detach the middle block if it's not already detached
IDataflowBlock localMiddle;
lock (locker)
{
if (link1 == null) return;
link1.Dispose();
link2.Dispose();
link1 = null;
link2 = middle.LinkTo(output); // Without completion propagation
localMiddle = middle;
}
localMiddle.Complete();
OnCompletion(localMiddle, () => CreateMiddleBlock(), output);
}
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
{
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex) { if (!block1.Completion.IsCanceled) block2.Fault(ex); }
}
async void OnCompletion(IDataflowBlock block, Action action, IDataflowBlock onError)
{
try { await block.Completion.ConfigureAwait(false); }
catch { } // Ignore
finally
{
try { action(); }
catch (Exception ex) { if (onError != null) onError.Fault(ex); else throw; }
}
}
}
// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
CreateVariableDopPropagator<TInput, TOutput>(
Func<TInput, TOutput> transform,
VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
return CreateVariableDopPropagator<TInput, TOutput>(
item => Task.FromResult(transform(item)), dataflowBlockOptions);
}
用法示例:
private VariableDopExecutionDataflowBlockOptions _options;
private IPropagatorBlock<string, bool> _urlChecker;
private HttpClient _client = new HttpClient();
public Form1()
{
InitializeComponent();
_options = new VariableDopExecutionDataflowBlockOptions();
_urlChecker = CreateVariableDopPropagator<string, bool>(async (url) =>
{
return (await _client.GetAsync(url)).IsSuccessStatusCode;
}, _options);
}
private void ListBox1_SelectedValueChanged(object sender, EventArgs e)
{
_options.MaxDegreeOfParallelism = (int)ListBox1.SelectedValue;
}