为什么任务不并行进行?
Why tasks are not going in parallel?
我有多个位置(称为 Cells)运行 进行测试。测试作为异步任务实现,因此 运行ning。用户可以 select 到 运行 对每个单元格进行任何测试。如果我 select 到 运行 对所有单元格进行相同的完全相同的测试,那么它或多或少会并行。
进行测试 A, B, C
,如果在单元格 1 和 2 上我 select 测试 A, B
并且在单元格 3 上我 select 仅 C
,那么对于一些单元格 1 和单元格 2 中的原因测试将开始 运行ning,但单元格 3 中的测试 C 不会开始,直到单元格 1 和单元格 2 中的 A 和 B 测试不会完成。基本上所有单元格中的所有测试都倾向于以相同的顺序 运行。那不是我想要的。我试图实现的是 运行 独立于每个单元格的测试链。现在我将展示我是如何实现的。
private async void buttonStartTest_Click(object sender, EventArgs e)
{
var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
if (cells == null)
return;
var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
var funcs = tests.Select(x => x.Value);
foreach (var func in funcs)
{
var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
blockList.Add(blockNew);
}
// link
for (int i = 0; i < blockList.Count - 1; i++)
{
var b1 = blockList[i];
var b2 = blockList[i + 1];
b1.LinkTo(b2);
}
// link first and last
blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });
foreach (Cell c in cells)
{
c.Reset();
c.State = Cell.States.InProgress;
var progressHandler = new Progress<string>(value =>
{
c.Status = value;
});
c.Progress = progressHandler as IProgress<string>;
blockPrepare.Post(c);
};
blockPrepare.Complete();
try
{
await blockFinalize.Completion;
}
catch (Exception ex)
{
logger.Debug(ex.InnerException.InnerException.Message);
}
}
在上方您可以看到每个单元格有 2 个强制块 - 准备和完成。这是我创建它们的方法:
public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
Action<Exception, Cell> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new TransformManyBlock<TInput, TOutput>(async input =>
{
try
{
var result = await transform(input);
return new[] { result };
}
catch (Exception ex)
{
exceptionHandler(ex, (input as Cell));
return Enumerable.Empty<TOutput>();
}
}, dataflowBlockOptions);
}
public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
Func<TInput, Task> action,
Action<Exception, Cell> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (Exception ex)
{
exceptionHandler(ex, (input as Cell));
}
}, dataflowBlockOptions);
}
测试本身看起来像这样:
public static async Task<Cell> TestDoorsAsync(Cell c)
{
int thisTestID = TEST_DOORS;
TestConfiguration conf = c.GetConfiguration(thisTestID);
if (conf.Enabled)
{
... // execute test
}
else
{
// report that test was skipped due to user configuration
}
return c;
}
那么是不是我遗漏了某些选项或者软件设计有误,这会阻止 运行 单元格中的测试而不等待其他单元格中的测试完成?
更新
repo 是演示该问题的最小控制台应用程序。
还有3个单元格和3个测试(任务)。在单元格 1、2 上,我 select 正在 运行 所有测试,而在单元格 3 上只测试 3。我期望在单元格 3 的准备任务之后立即看到跳过的测试 1、2和 运行宁测试 3.
我看到的是(# - 手机号)
#1 Preparing...
#2 Preparing...
#3 Preparing...
#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...
#2 Finalizing...
#1 Finalizing...
#3 Finalizing...
单元格 3 中的测试与单元格 1 和单元格 2 中的测试同步。所有测试同时完成,而单元格 3 中的单个测试应该比其他单元格更早完成。
很难确定,但我可以肯定地在您的代码中看到两个缺陷:
- 您没有在列表中的变换块之间传播完成
- 您正在使用阻塞同步方法来传递消息:
.Post
而不是 SendAsync
,这显然是您在这里获得异步流所需要的。所以最后一个必须等到第一个完成。
此外,您需要了解使用 BoundedCapacity
会在您的管道中引入节流,因此您应该检查缓冲区大小,也许很多线程只是在等待队列中的某个位置可用。
您可以尝试的另一件事是调平 DataflowBlockOptions.MaxMessagesPerTask
属性。这个 属性 用于一个贪婪块执行得非常快,并处理越来越多的消息,而不让其他块完成它们的工作的情况。在内部,每个块都有一个Task
,里面正在做处理,默认是-1,表示无限条消息。通过将其设置为某个正数,您可以强制块重新启动其内部任务并向其他任务提供一些 space。
更多进阶技巧,请参考official docs。
感谢您的编辑。将 EnsureOrdered = false
添加到阻止选项。发生的情况是,您的 TransfomrBlocks
在处理完所有单元格之前不会传递这些单元格,因此它们可以维持您的订单。这是默认的,通常更可取,但不是你的情况。
当我评论说他们在当前代码中没有错时,我好像错了。
我有多个位置(称为 Cells)运行 进行测试。测试作为异步任务实现,因此 运行ning。用户可以 select 到 运行 对每个单元格进行任何测试。如果我 select 到 运行 对所有单元格进行相同的完全相同的测试,那么它或多或少会并行。
进行测试 A, B, C
,如果在单元格 1 和 2 上我 select 测试 A, B
并且在单元格 3 上我 select 仅 C
,那么对于一些单元格 1 和单元格 2 中的原因测试将开始 运行ning,但单元格 3 中的测试 C 不会开始,直到单元格 1 和单元格 2 中的 A 和 B 测试不会完成。基本上所有单元格中的所有测试都倾向于以相同的顺序 运行。那不是我想要的。我试图实现的是 运行 独立于每个单元格的测试链。现在我将展示我是如何实现的。
private async void buttonStartTest_Click(object sender, EventArgs e)
{
var cells = objectListView.CheckedObjects.Cast<Cell>().ToList();
if (cells == null)
return;
var blockPrepare = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(Tests.Prepare), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
var blockFinalize = CreateExceptionCatchingActionBlock(new Func<Cell, Task>(Tests.Finalize), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
List<IPropagatorBlock<Cell, Cell>> blockList = new List<IPropagatorBlock<Cell, Cell>>();
var funcs = tests.Select(x => x.Value);
foreach (var func in funcs)
{
var blockNew = CreateExceptionCatchingTransformBlock(new Func<Cell, Task<Cell>>(func), new Action<Exception, Cell>(HandleUnhandledException), new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = 40,
});
blockList.Add(blockNew);
}
// link
for (int i = 0; i < blockList.Count - 1; i++)
{
var b1 = blockList[i];
var b2 = blockList[i + 1];
b1.LinkTo(b2);
}
// link first and last
blockPrepare.LinkTo(blockList[0], new DataflowLinkOptions { PropagateCompletion = true });
blockList[blockList.Count - 1].LinkTo(blockFinalize, new DataflowLinkOptions { PropagateCompletion = true });
foreach (Cell c in cells)
{
c.Reset();
c.State = Cell.States.InProgress;
var progressHandler = new Progress<string>(value =>
{
c.Status = value;
});
c.Progress = progressHandler as IProgress<string>;
blockPrepare.Post(c);
};
blockPrepare.Complete();
try
{
await blockFinalize.Completion;
}
catch (Exception ex)
{
logger.Debug(ex.InnerException.InnerException.Message);
}
}
在上方您可以看到每个单元格有 2 个强制块 - 准备和完成。这是我创建它们的方法:
public IPropagatorBlock<TInput, TOutput> CreateExceptionCatchingTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
Action<Exception, Cell> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new TransformManyBlock<TInput, TOutput>(async input =>
{
try
{
var result = await transform(input);
return new[] { result };
}
catch (Exception ex)
{
exceptionHandler(ex, (input as Cell));
return Enumerable.Empty<TOutput>();
}
}, dataflowBlockOptions);
}
public ITargetBlock<TInput> CreateExceptionCatchingActionBlock<TInput>(
Func<TInput, Task> action,
Action<Exception, Cell> exceptionHandler,
ExecutionDataflowBlockOptions dataflowBlockOptions)
{
return new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (Exception ex)
{
exceptionHandler(ex, (input as Cell));
}
}, dataflowBlockOptions);
}
测试本身看起来像这样:
public static async Task<Cell> TestDoorsAsync(Cell c)
{
int thisTestID = TEST_DOORS;
TestConfiguration conf = c.GetConfiguration(thisTestID);
if (conf.Enabled)
{
... // execute test
}
else
{
// report that test was skipped due to user configuration
}
return c;
}
那么是不是我遗漏了某些选项或者软件设计有误,这会阻止 运行 单元格中的测试而不等待其他单元格中的测试完成?
更新
repo 是演示该问题的最小控制台应用程序。
还有3个单元格和3个测试(任务)。在单元格 1、2 上,我 select 正在 运行 所有测试,而在单元格 3 上只测试 3。我期望在单元格 3 的准备任务之后立即看到跳过的测试 1、2和 运行宁测试 3.
我看到的是(# - 手机号)
#1 Preparing...
#2 Preparing...
#3 Preparing...
#1 Test1 running...
#2 Test1 running...
#3 Test1 skipped
#1 Test2 running...
#2 Test2 running...
#3 Test2 skipped
#1 Test3 running...
#2 Test3 running...
#3 Test3 running...
#2 Finalizing...
#1 Finalizing...
#3 Finalizing...
单元格 3 中的测试与单元格 1 和单元格 2 中的测试同步。所有测试同时完成,而单元格 3 中的单个测试应该比其他单元格更早完成。
很难确定,但我可以肯定地在您的代码中看到两个缺陷:
- 您没有在列表中的变换块之间传播完成
- 您正在使用阻塞同步方法来传递消息:
.Post
而不是SendAsync
,这显然是您在这里获得异步流所需要的。所以最后一个必须等到第一个完成。
此外,您需要了解使用 BoundedCapacity
会在您的管道中引入节流,因此您应该检查缓冲区大小,也许很多线程只是在等待队列中的某个位置可用。
您可以尝试的另一件事是调平 DataflowBlockOptions.MaxMessagesPerTask
属性。这个 属性 用于一个贪婪块执行得非常快,并处理越来越多的消息,而不让其他块完成它们的工作的情况。在内部,每个块都有一个Task
,里面正在做处理,默认是-1,表示无限条消息。通过将其设置为某个正数,您可以强制块重新启动其内部任务并向其他任务提供一些 space。
更多进阶技巧,请参考official docs。
感谢您的编辑。将 EnsureOrdered = false
添加到阻止选项。发生的情况是,您的 TransfomrBlocks
在处理完所有单元格之前不会传递这些单元格,因此它们可以维持您的订单。这是默认的,通常更可取,但不是你的情况。
当我评论说他们在当前代码中没有错时,我好像错了。