如何恢复打乱的数据流管道的顺序?

How to restore the order of a shuffled Dataflow pipeline?

我有一个数据流管道,它由多个处理异构文档(XLS、PDF 等)的块组成。每种类型的文件都由专门的 TransformBlock 处理。在管道的末端,我有一个 ActionBlock 接收所有已处理的文档,并将它们一个接一个地上传到 Web 服务器。我的问题是我无法找到一种方法来满足按照文件最初输入管道的相同顺序上传文件的要求。例如,我不能使用 EnsureOrdered 选项来发挥我的优势,因为此选项配置单个块的行为,而不是并行工作的多个块的行为。我的要求是:

  1. 按特定顺序将文档插入管道。
  2. 根据文档的类型以不同方式处理每个文档。
  3. 应按顺序处理特定类型的文档。
  4. 可以(并且应该)并行处理不同类型的文档。
  5. 所有文件在处理后应尽快上传。
  6. 文档必须按顺序上传,并且按照它们在管道中输入的顺序。

例如要求文档#8必须在文档#7之后上传,即使它在文档#7之前被处理。

第五个需求意思是我等不及所有的文档都处理完了,然后按索引排序,最后上传。上传必须与处理同时进行。

这是我正在尝试做的一个最小示例。为简单起见,我没有为块提供 IDocument 接口的实例,而是使用简单的整数。每个整数的值代表它进入管道的顺序,以及必须上传的顺序:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(uploader);
pdfBlock.LinkTo(uploader);

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => uploader.Complete());

await uploader.Completion;

输出为:

Uploading document #1
Uploading document #2
Uploading document #3
Uploading document #5
Uploading document #4
Uploading document #7
Uploading document #6
Uploading document #9
Uploading document #8
Uploading document #10

(Try it on Fiddle)

理想的顺序是#1、#2、#3、#4、#5、#6、#7、#8、#9、#10。

如何在将已处理文档发送到 uploader 块之前恢复它们的顺序?

澄清: 通过用单个通用 TransformBlock 替换多个特定的 TransformBlock 来彻底改变管道的架构不是一种选择.理想的情况是在处理器和上传者之间拦截一个块,这将恢复文档的顺序。

uploader 应该将文档添加到某个已排序的已完成文档列表中,并检查添加的文档是否是下一个应该上传的文档。如果应该,则从排序列表中删除并上传所有文档,直到缺少一个。

还有一个同步问题。对这个排序列表的访问必须跨线程同步。但是您希望所有线程都在做某事,而不是等待其他线程完成它们的工作。所以,uploader 应该像这样处理列表:

  • 在同步锁内向列表添加新文档,然后释放锁
  • 循环中
    • 再次输入相同的同步锁,
    • 如果设置了 upload_in_progress 标志则什么都不做并且 return.
    • 检查是否应上传列表顶部的文档,
      • 如果不是则重置 upload_in_progress 标志,并且 return.
      • 否则从列表中删除文档,
      • 设置upload_in_progress标志,
      • 解除锁定,
      • 上传文档。

希望我没有想错。如您所见,要使其既安全又高效是很棘手的。在大多数情况下,肯定有一种方法只用一个锁就可以做到这一点,但它不会增加太多效率。 upload_in_progress 标志在任务之间共享,就像列表本身一样。

我设法实现了一个数据流块,它可以根据包含已处理文档的排序列表的 Dialecticus 来恢复我的混洗管道的顺序。我最终使用了一个简单的 Dictionary 而不是 SortedList,它似乎也能正常工作。

/// <summary>Creates a dataflow block that restores the order of
/// a shuffled pipeline.</summary>
public static IPropagatorBlock<T, T> CreateRestoreOrderBlock<T>(
    Func<T, long> indexSelector,
    long startingIndex = 0L,
    DataflowBlockOptions options = null)
{
    if (indexSelector == null) throw new ArgumentNullException(nameof(indexSelector));
    var executionOptions = new ExecutionDataflowBlockOptions();
    if (options != null)
    {
        executionOptions.CancellationToken = options.CancellationToken;
        executionOptions.BoundedCapacity = options.BoundedCapacity;
        executionOptions.EnsureOrdered = options.EnsureOrdered;
        executionOptions.TaskScheduler = options.TaskScheduler;
        executionOptions.MaxMessagesPerTask = options.MaxMessagesPerTask;
        executionOptions.NameFormat = options.NameFormat;
    }

    var buffer = new Dictionary<long, T>();
    long minIndex = startingIndex;

    IEnumerable<T> Transform(T item)
    {
        // No synchronization needed because MaxDegreeOfParallelism = 1
        long index = indexSelector(item);
        if (index < startingIndex)
            throw new InvalidOperationException($"Index {index} is out of range.");
        if (index < minIndex)
            throw new InvalidOperationException($"Index {index} has been consumed.");
        if (!buffer.TryAdd(index, item)) // .NET Core only API
            throw new InvalidOperationException($"Index {index} is not unique.");
        while (buffer.Remove(minIndex, out var minItem)) // .NET Core only API
        {
            minIndex++;
            yield return minItem;
        }
    }

    // Ideally the assertion buffer.Count == 0 should be checked on the completion
    // of the block.
    return new TransformManyBlock<T, T>(Transform, executionOptions);
}

用法示例:

var xlsBlock = new TransformBlock<int, int>(document =>
{
    int duration = 300 + document % 3 * 300;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});
var pdfBlock = new TransformBlock<int, int>(document =>
{
    int duration = 100 + document % 5 * 200;
    Thread.Sleep(duration); // Simulate CPU-bound work
    return document;
});

var orderRestorer = CreateRestoreOrderBlock<int>(
    indexSelector: document => document, startingIndex: 1L);

var uploader = new ActionBlock<int>(async document =>
{
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Uploading document #{document}");
    await Task.Delay(500); // Simulate I/O-bound work
});

xlsBlock.LinkTo(orderRestorer);
pdfBlock.LinkTo(orderRestorer);
orderRestorer.LinkTo(uploader, new DataflowLinkOptions { PropagateCompletion = true });

foreach (var document in Enumerable.Range(1, 10))
{
    if (document % 2 == 0)
        xlsBlock.Post(document);
    else
        pdfBlock.Post(document);
}
xlsBlock.Complete();
pdfBlock.Complete();
_ = Task.WhenAll(xlsBlock.Completion, pdfBlock.Completion)
    .ContinueWith(_ => orderRestorer.Complete());

await uploader.Completion;

输出:

09:24:18.846 Uploading document #1
09:24:19.436 Uploading document #2
09:24:19.936 Uploading document #3
09:24:20.441 Uploading document #4
09:24:20.942 Uploading document #5
09:24:21.442 Uploading document #6
09:24:21.941 Uploading document #7
09:24:22.441 Uploading document #8
09:24:22.942 Uploading document #9
09:24:23.442 Uploading document #10

(Try it on Fiddle,具有 .NET Framework 兼容版本)