为什么任务不并行进行?

Why tasks are not going in parallel?

我有多个位置(称为 Cells)运行 进行测试。测试作为异步任务实现,因此 运行ning。用户可以 select 到 运行 对每个单元格进行任何测试。如果我 select 到 运行 对所有单元格进行相同的完全相同的测试,那么它或多或少会并行。

进行测试 A, B, C,如果在单元格 1 和 2 上我 select 测试 A, B 并且在单元格 3 上我 select 仅 C,那么对于一些单元格 1 和单元格 2 中的原因测试将开始 运行ning,但单元格 3 中的测试 C 不会开始,直到单元格 1 和单元格 2 中的 A 和 B 测试不会完成。基本上所有单元格中的所有测试都倾向于以相同的顺序 运行。那不是我想要的。我试图实现的是 运行 独立于每个单元格的测试链。现在我将展示我是如何实现的。

private async void buttonStartTest_Click(object sender, EventArgs e)
{
    var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
    if (cells == null)
        return;

    var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10000,
        MaxDegreeOfParallelism = 40,
    });

    List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
    var funcs = tests.Select(x => x.Value);
    foreach (var func in funcs)
    {
        var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = 10000,
            MaxDegreeOfParallelism = 40,
        });
        blockList.Add(blockNew);
    }

    // link
    for (int i = 0; i < blockList.Count - 1; i++)
    {
        var b1 = blockList[i];
        var b2 = blockList[i + 1];
        b1.LinkTo(b2);
    }

    // link first and last
    blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
    blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });

    foreach (Cell c in cells)
    {
        c.Reset();
        c.State = Cell.States.InProgress;
        var progressHandler = new Progress<string>(value =>
        {
            c.Status = value;
        });

        c.Progress = progressHandler as IProgress<string>;
        blockPrepare.Post(c);
    };

    blockPrepare.Complete();
    try
    {
        await blockFinalize.Completion;
    }
    catch (Exception ex)
    {
        logger.Debug(ex.InnerException.InnerException.Message);
    }
}

在上方您可以看到每个单元格有 2 个强制块 - 准备和完成。这是我创建它们的方法:

public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
                Func<TInput, Task<TOutput>> transform,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new TransformManyBlock<TInput, TOutput>(async input =>
    {
        try
        {
            var result = await transform(input);
            return new[] { result };
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));

            return Enumerable.Empty<TOutput>();
        }
    }, dataflowBlockOptions);
}

public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
                Func<TInput, Task> action,
                Action<Exception, Cell> exceptionHandler,
                ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return new ActionBlock<TInput>(async input =>
    {
        try
        {
            await action(input);
        }
        catch (Exception ex)
        {
            exceptionHandler(ex, (input as Cell));
        }
    }, dataflowBlockOptions);
}

测试本身看起来像这样:

public static async Task<Cell> TestDoorsAsync(Cell c)
{
    int thisTestID = TEST_DOORS;
    TestConfiguration conf = c.GetConfiguration(thisTestID);
    if (conf.Enabled)
    {
       ... // execute test
    }
    else
    {
       // report that test was skipped due to user configuration
    }

    return c;
}

那么是不是我遗漏了某些选项或者软件设计有误,这会阻止 运行 单元格中的测试而不等待其他单元格中的测试完成?

更新

repo 是演示该问题的最小控制台应用程序。

还有3个单元格和3个测试(任务)。在单元格 1、2 上,我 select 正在 运行 所有测试,而在单元格 3 上只测试 3。我期望在单元格 3 的准备任务之后立即看到跳过的测试 1、2和 运行宁测试 3.

我看到的是(# ​​- 手机号)

#1 Preparing...
#2 Preparing...
#3 Preparing...

#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...

#2 Finalizing...
#1 Finalizing...
#3 Finalizing...

单元格 3 中的测试与单元格 1 和单元格 2 中的测试同步。所有测试同时完成,而单元格 3 中的单个测试应该比其他单元格更早完成。

很难确定,但我可以肯定地在您的代码中看到两个缺陷:

  1. 您没有在列表中的变换块之间传播完成
  2. 您正在使用阻塞同步方法来传递消息:.Post 而不是 SendAsync,这显然是您在这里获得异步流所需要的。所以最后一个必须等​​到第一个完成。

此外,您需要了解使用 BoundedCapacity 会在您的管道中引入节流,因此您应该检查缓冲区大小,也许很多线程只是在等待队列中的某个位置可用。

您可以尝试的另一件事是调平 DataflowBlockOptions.MaxMessagesPerTask 属性。这个 属性 用于一个贪婪块执行得非常快,并处理越来越多的消息,而不让其他块完成它们的工作的情况。在内部,每个块都有一个Task,里面正在做处理,默认是-1,表示无限条消息。通过将其设置为某个正数,您可以强制块重新启动其内部任务并向其他任务提供一些 space。

更多进阶技巧,请参考official docs

感谢您的编辑。将 EnsureOrdered = false 添加到阻止选项。发生的情况是,您的 TransfomrBlocks 在处理完所有单元格之前不会传递这些单元格,因此它们可以维持您的订单。这是默认的,通常更可取,但不是你的情况。

当我评论说他们在当前代码中没有错时,我好像错了。