有没有办法等待所有任务直到特定结果为真,然后取消其余的?

Is there a way to wait for all tasks until a specific result is true, and then cancel the rest?

在我的 C# 控制台应用程序中,我尝试 运行 多个任务同时执行各种数据检查。 如果 其中一项任务 return 为真,我应该停止其他任务,因为我得到了可操作的结果。也很有可能 none 函数 return true

我有 运行 任务的代码(我想),我只是无法到达终点线:

Task task1 = Task.Run(() => Task1(stoppingToken));
Task task2 = Task.Run(() => Task2(stoppingToken));
Task task3 = Task.Run(() => Task3(stoppingToken));
Task task4 = Task.Run(() => Task4(stoppingToken));
Task task5 = Task.Run(() => Task5(stoppingToken));
Task task6 = Task.Run(() => Task6(stoppingToken));

Task.WaitAll(task1, task2, task3, task4, task5, task6);

这与已知所需结果(超时值)的链接问题中的答案略有不同。我正在等待这些任务中的任何一个可能 return 为真,然后取消剩余的任务,如果它们仍然 运行ning

假设你的任务 return bool 你可以这样做:

CancellationTokenSource source = new CancellationTokenSource();
CancellationToken stoppingToken = source.Token;
Task<bool> task1 = Task.Run(() => Task1(stoppingToken));
....

var tasks = new List<Task<bool>>
{
    task1, task2, task3, ...
};

bool taskResult = false;
do
{
    var finished = await Task.WhenAny(tasks);
    taskResult = finished.Result;
    tasks.Remove(finished);
} while (tasks.Any() && !taskResult);

source.Cancel();

这是一个基于连续任务的解决方案。这个想法是将延续任务附加到每个原始(提供的)任务,并在那里检查结果。如果匹配,补全源将设置一个结果(如果没有匹配,则根本不会设置结果)。

然后,代码将等待最先发生的事情:要么所有延续任务完成,要么设置任务完成结果。无论哪种方式,我们都准备好检查与任务完成源关联的任务的结果(这就是为什么我们等待 延续任务 完成,而不是原始任务),如果它是设置,这在很大程度上表明我们有匹配项(最后的额外检查有点偏执,但我想安全总比抱歉好......:D)

public static async Task<bool> WhenAnyHasResult<T>(Predicate<T> isExpectedResult, params Task<T>[] tasks)
{
    const TaskContinuationOptions continuationTaskFlags = TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.AttachedToParent;
         
    // Prepare TaskCompletionSource to be set only when one of the provided tasks
    // completes with expected result
    var tcs = new TaskCompletionSource<T>();

    // For every provided task, attach a continuation task that fires
    // once the original task was completed
    var taskContinuations = tasks.Select(task =>
    {
        return task.ContinueWith(x =>
        {
            var taskResult = x.Result;
            if (isExpectedResult(taskResult))
            {
                tcs.SetResult(taskResult);
            }
        },
        continuationTaskFlags);
    });

    // We either wait for all the continuation tasks to be completed 
    // (it's most likely an indication that none of the provided tasks completed with the expected result)
    // or for the TCS task to complete (which means a failure)
    await Task.WhenAny(Task.WhenAll(taskContinuations), tcs.Task);

    // If the task from TCS has run to completion, it means the result has been set from
    // the continuation task attached to one of the tasks provided in the arguments
    var completionTask = tcs.Task;
    if (completionTask.IsCompleted)
    {
        // We will check once more to make sure the result is set as expected 
        // and return this as our outcome
        var tcsResult = completionTask.Result;
        return isExpectedResult(tcsResult);
    }

    // TCS result was never set, which means we did not find a task matching the expected result.
    tcs.SetCanceled();
    return false;
}

现在,用法如下:

static async Task ExampleWithBooleans()
{
    Console.WriteLine("Example with booleans");

    var task1 = SampleTask(3000, true);
    var task2 = SampleTask(5000, false);

    var finalResult = await TaskUtils.WhenAnyHasResult(result => result == true, task1, task2);

    // go ahead and cancel your cancellation token here

    Console.WriteLine("Final result: " + finalResult);
    Debug.Assert(finalResult == true);
    Console.WriteLine();
}

将它放入泛型方法的好处在于,作为原始任务的结果,它适用于任何类型,而不仅仅是布尔值。

您可以使用异步方法将 Task<bool> 包装到另一个 Task<bool>,并在输入任务的结果为 true 时取消 CancellationTokenSource。在下面的示例中,此方法是 IfTrueCancel,它被实现为 local function. This way it captures CancellationTokenSource,因此您不必在每次调用时都将其作为参数传递:

var cts = new CancellationTokenSource();
var stoppingToken = cts.Token;

var task1 = IfTrueCancel(Task.Run(() => Task1(stoppingToken)));
var task2 = IfTrueCancel(Task.Run(() => Task2(stoppingToken)));
var task3 = IfTrueCancel(Task.Run(() => Task3(stoppingToken)));
var task4 = IfTrueCancel(Task.Run(() => Task4(stoppingToken)));
var task5 = IfTrueCancel(Task.Run(() => Task5(stoppingToken)));
var task6 = IfTrueCancel(Task.Run(() => Task6(stoppingToken)));

Task.WaitAll(task1, task2, task3, task4, task5, task6);

async Task<bool> IfTrueCancel(Task<bool> task)
{
    bool result = await task.ConfigureAwait(false);
    if (result) cts.Cancel();
    return result;
}

这个问题的另一种完全不同的解决方案是使用 PLINQ instead of explicitly created Tasks. PLINQ requires an IEnumerable of something in order to do parallel work on it, and in your case this something is the Task1, Task2 etc functions that you want to invoke. You could put them in an array of Func<CancellationToken, bool>,并以这种方式解决问题:

var functions = new Func<CancellationToken, bool>[]
{
    Task1, Task2, Task3, Task4, Task5, Task6
};

bool success = functions
    .AsParallel()
    .WithDegreeOfParallelism(4)
    .Select(function =>
    {
        try
        {
            bool result = function(stoppingToken);
            if (result) cts.Cancel();
            return result;
        }
        catch (OperationCanceledException)
        {
            return false;
        }
    })
    .Any(result => result);

这种方式的优点是可以配置并行度,不必依赖ThreadPool可用性来限制整个操作的并发度。缺点是所有函数都应该有相同的签名。您可以通过像这样将函数声明为 lambda 表达式来克服这个缺点:

var functions = new Func<CancellationToken, bool>[]
{
    ct => Task1(arg1, ct),
    ct => Task2(arg1, arg2, ct),
    ct => Task3(ct),
    ct => Task4(arg1, arg2, arg3, ct),
    ct => Task5(arg1, ct),
    ct => Task6(ct)
};