如何强制 ActionBlock 快速完成

How to force an ActionBlock to complete fast

根据 documentation:

A dataflow block is considered completed when it is not currently processing a message and when it has guaranteed that it will not process any more messages.

这种行为对我来说并不理想。我希望能够随时取消作业,但处理每个单独的操作需要很长时间。所以当我取消令牌时,效果不是立竿见影的。我必须等待当前处理的项目完成。我没有办法直接取消操作,因为我使用的 API 是不可取消的。我可以做任何事情让块忽略当前 运行 操作,并立即完成吗?

这里有一个例子可以说明我的问题。令牌在 500 毫秒后取消,每个动作的持续时间为 1000 毫秒:

static async Task Main()
{
    var cts = new CancellationTokenSource(500);
    var block = new ActionBlock<int>(async x =>
    {
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
    block.Post(1); // I must wait for this one to complete
    block.Post(2); // This one is ignored
    block.Complete();
    var stopwatch = Stopwatch.StartNew();
    try
    {
        await block.Completion;
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine($"Canceled after {stopwatch.ElapsedMilliseconds} msec");
    }
}

输出:

Canceled after 1035 msec

所需的输出将在 ~500 毫秒后取消。

根据您的评论摘录...:

What I want to happen in case of a cancellation request is to ignore the currently running workitem. I don't care about it any more, so why I have to wait for it?

...假设您 确实 可以离开任务 运行,您可以简单地将要调用的作业包装在 中另一个 Taskconstantly 轮询取消或完成,并取消 that Task。看看下面的 "proof-of-concept" 代码,它将一个 "long-running" 任务包装在另一个任务 "tasked" 中,不断轮询包装的任务是否完成,以及一个用于取消的 CancellationToken(完全 "spur-of-the-moment" status,当然你会想要重新调整一下):

public class LongRunningTaskSource
{
    public Task LongRunning(int milliseconds)
    {
        return Task.Run(() =>
        {
            Console.WriteLine("Starting long running task");
            Thread.Sleep(3000);
            Console.WriteLine("Finished long running task");
        });
    }

    public Task LongRunningTaskWrapper(int milliseconds, CancellationToken token)
    {
        Task task = LongRunning(milliseconds);

        Task wrapperTask = Task.Run(() =>
        {
            while (true)
            {
                //Check for completion (you could, of course, do different things
                //depending on whether it is faulted or completed).
                if (!(task.Status == TaskStatus.Running))
                    break;

                //Check for cancellation.
                if (token.IsCancellationRequested)
                {
                    Console.WriteLine("Aborting Task.");

                    token.ThrowIfCancellationRequested();
                }
            }

        }, token);

        return wrapperTask;
    }
}

使用以下代码:

static void Main()
{
    LongRunningTaskSource longRunning = new LongRunningTaskSource();

    CancellationTokenSource cts = new CancellationTokenSource(1500);

    Task task = longRunning.LongRunningTaskWrapper(3000, cts.Token);

    //Sleep long enough to let things roll on their own.
    Thread.Sleep(5000);

    Console.WriteLine("Ended Main");
}

...产生以下输出:

Starting long running task
Aborting Task.
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Finished long running task
Ended Main

包装的任务显然会在自己合适的时间完成。如果您对此没有问题,通常不是这种情况,希望这能满足您的需求。

作为补充示例,运行以下代码(让被包装的任务在超时前完成):

static void Main()
{
    LongRunningTaskSource longRunning = new LongRunningTaskSource();

    CancellationTokenSource cts = new CancellationTokenSource(3000);

    Task task = longRunning.LongRunningTaskWrapper(1500, cts.Token);

    //Sleep long enough to let things roll on their own.
    Thread.Sleep(5000);

    Console.WriteLine("Ended Main");
}

...产生以下输出:

Starting long running task
Finished long running task
Ended Main

因此任务在超时前开始并完成,无需取消任何操作。当然,等待时没有任何阻塞。正如您可能已经知道的那样,当然,如果您 知道 长 运行 代码中在幕后使用的是什么,那么在必要时进行清理会很好。

希望您可以调整此示例以将类似的内容传递给您的 ActionBlock。

免责声明和注意事项

我不熟悉 TPL 数据流库,所以这当然只是一种解决方法。另外,例如,如果您只有一个同步方法调用,您根本没有任何影响,那么您显然需要 两个 任务。一个包装器任务包装同步调用,另一个包装器任务包装包装器任务以包括连续状态轮询和取消检查。