如何更改现有 TPL 块的 MaxDegreeOfParallelism?

How to change the MaxDegreeOfParallelism of an existing TPL block?

我已经使用 TPL 数据流创建了一个小型管道。它由链接到 ActionBlockTransformBlock 组成。这是设置的样子:

var transformBlock = new TransformBlock<int, string>(async num=>
{
    //  Do stuff.
    return num.ToString();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount,
});

var actionBlock = new ActionBlock<string>(text=>
{
    Console.WriteLine(text);
});

transformBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
    PropagateCompletion = true
});

我想更改变换块实例的 MaxDegreeOfParallelism。我还没有找到这样做的方法,因为没有 public 属性 允许我更改它。有办法吗?

根据 MSDN:

Dataflow block capture the state of the options at their construction. Subsequent changes to the provided ExecutionDataflowBlockOptions instance should not affect the behavior of a dataflow block.

所以,回答你的问题,

I would like to alter the MaxDegreeOfParallelism of the transform block instance.

它不是为了让现有的块改变它的选项。您应该创建新的或创建新的管道,或者从一开始就调整此设置以获得最大利润。

我试图通过使用夹在两个 BufferBlock 之间的可互换 TransformBlock 来解决这个问题。每次 MaxDegreeOfParallelism 改变时,旧的 TransformBlock 被分离,标记为完成,等待它完成,然后一个新的 TransformBlock 被附加到它的位置。此解决方案有两个缺点:

  1. 在更换中间块的间歇阶段,并行度逐渐降为零。

  2. 中间块的 BoundedCapacity 必须设置为等于 MaxDegreeOfParallelism,这可能会影响非常精细的工作负载的性能。

这里是 CreateVariableDopPropagator 方法,它的同伴 VariableDopExecutionDataflowBlockOptions class:

public class VariableDopExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
{
    private readonly object _locker = new object();

    public event EventHandler MaxDegreeOfParallelismChanged;

    /// <summary>Gets the maximum number of messages that may be processed by the
    /// block concurrently.</summary>
    public new int MaxDegreeOfParallelism
    {
        get { lock (_locker) return base.MaxDegreeOfParallelism; }
        set
        {
            bool changed;
            lock (_locker)
            {
                changed = value != base.MaxDegreeOfParallelism;
                base.MaxDegreeOfParallelism = value;
            }
            if (changed) MaxDegreeOfParallelismChanged?.Invoke(this, EventArgs.Empty);
        }
    }
}

public static IPropagatorBlock<TInput, TOutput>
    CreateVariableDopPropagator<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));

    var optionsCopy = new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
        MaxDegreeOfParallelism = dataflowBlockOptions.MaxDegreeOfParallelism,
        MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
        NameFormat = dataflowBlockOptions.NameFormat,
        SingleProducerConstrained = dataflowBlockOptions.SingleProducerConstrained,
        TaskScheduler = dataflowBlockOptions.TaskScheduler,
    };

    var locker = new object();
    var input = new BufferBlock<TInput>(optionsCopy);
    var output = new BufferBlock<TOutput>(optionsCopy);
    PropagateFailure(output, input);

    var propagateCompletion = new DataflowLinkOptions() { PropagateCompletion = true };
    TransformBlock<TInput, TOutput> middle;
    IDisposable link1 = null;
    IDisposable link2 = null;
    CreateMiddleBlock();

    dataflowBlockOptions.MaxDegreeOfParallelismChanged += OnMaxDopChanged;
    OnCompletion(output, () =>
    {
        dataflowBlockOptions.MaxDegreeOfParallelismChanged -= OnMaxDopChanged;
    }, null);

    return DataflowBlock.Encapsulate(input, output);

    void CreateMiddleBlock()
    {
        IDataflowBlock localMiddle;
        lock (locker)
        {
            link1?.Dispose();
            link2?.Dispose();
            var maxDop = dataflowBlockOptions.MaxDegreeOfParallelism;
            optionsCopy.MaxDegreeOfParallelism = maxDop;
            optionsCopy.BoundedCapacity = maxDop;
            middle = new TransformBlock<TInput, TOutput>(transform, optionsCopy);
            link1 = input.LinkTo(middle, propagateCompletion);
            link2 = middle.LinkTo(output, propagateCompletion);
            localMiddle = middle;
        }
        PropagateFailure(localMiddle, input); // Non disposable link, but it doesn't
        // matter because the completion of the middle must be awaited anyway.
    }

    void OnMaxDopChanged(object sender, EventArgs e)
    {
        // Detach the middle block if it's not already detached
        IDataflowBlock localMiddle;
        lock (locker)
        {
            if (link1 == null) return;
            link1.Dispose();
            link2.Dispose();
            link1 = null;
            link2 = middle.LinkTo(output); // Without completion propagation
            localMiddle = middle;
        }
        localMiddle.Complete();
        OnCompletion(localMiddle, () => CreateMiddleBlock(), output);
    }

    async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
    {
        try { await block1.Completion.ConfigureAwait(false); }
        catch (Exception ex) { if (!block1.Completion.IsCanceled) block2.Fault(ex); }
    }

    async void OnCompletion(IDataflowBlock block, Action action, IDataflowBlock onError)
    {
        try { await block.Completion.ConfigureAwait(false); }
        catch { } // Ignore
        finally
        {
            try { action(); }
            catch (Exception ex) { if (onError != null) onError.Fault(ex); else throw; }
        }
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateVariableDopPropagator<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    VariableDopExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return CreateVariableDopPropagator<TInput, TOutput>(
        item => Task.FromResult(transform(item)), dataflowBlockOptions);
}

用法示例:

private VariableDopExecutionDataflowBlockOptions _options;
private IPropagatorBlock<string, bool> _urlChecker;
private HttpClient _client = new HttpClient();

public Form1()
{
    InitializeComponent();
    _options = new VariableDopExecutionDataflowBlockOptions();
    _urlChecker = CreateVariableDopPropagator<string, bool>(async (url) =>
    {
        return (await _client.GetAsync(url)).IsSuccessStatusCode;
    }, _options);
}

private void ListBox1_SelectedValueChanged(object sender, EventArgs e)
{
    _options.MaxDegreeOfParallelism = (int)ListBox1.SelectedValue;
}