如何强制 ActionBlock 快速完成
How to force an ActionBlock to complete fast
根据 documentation:
A dataflow block is considered completed when it is not currently processing a message and when it has guaranteed that it will not process any more messages.
这种行为对我来说并不理想。我希望能够随时取消作业,但处理每个单独的操作需要很长时间。所以当我取消令牌时,效果不是立竿见影的。我必须等待当前处理的项目完成。我没有办法直接取消操作,因为我使用的 API 是不可取消的。我可以做任何事情让块忽略当前 运行 操作,并立即完成吗?
这里有一个例子可以说明我的问题。令牌在 500 毫秒后取消,每个动作的持续时间为 1000 毫秒:
static async Task Main()
{
var cts = new CancellationTokenSource(500);
var block = new ActionBlock<int>(async x =>
{
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
block.Post(1); // I must wait for this one to complete
block.Post(2); // This one is ignored
block.Complete();
var stopwatch = Stopwatch.StartNew();
try
{
await block.Completion;
}
catch (OperationCanceledException)
{
Console.WriteLine($"Canceled after {stopwatch.ElapsedMilliseconds} msec");
}
}
输出:
Canceled after 1035 msec
所需的输出将在 ~500 毫秒后取消。
根据您的评论摘录...:
What I want to happen in case of a cancellation request is to ignore the currently running workitem. I don't care about it any more, so why I have to wait for it?
...假设您 确实 可以离开任务 运行,您可以简单地将要调用的作业包装在 中另一个 Task
将 constantly 轮询取消或完成,并取消 that Task
。看看下面的 "proof-of-concept" 代码,它将一个 "long-running" 任务包装在另一个任务 "tasked" 中,不断轮询包装的任务是否完成,以及一个用于取消的 CancellationToken(完全 "spur-of-the-moment" status,当然你会想要重新调整一下):
public class LongRunningTaskSource
{
public Task LongRunning(int milliseconds)
{
return Task.Run(() =>
{
Console.WriteLine("Starting long running task");
Thread.Sleep(3000);
Console.WriteLine("Finished long running task");
});
}
public Task LongRunningTaskWrapper(int milliseconds, CancellationToken token)
{
Task task = LongRunning(milliseconds);
Task wrapperTask = Task.Run(() =>
{
while (true)
{
//Check for completion (you could, of course, do different things
//depending on whether it is faulted or completed).
if (!(task.Status == TaskStatus.Running))
break;
//Check for cancellation.
if (token.IsCancellationRequested)
{
Console.WriteLine("Aborting Task.");
token.ThrowIfCancellationRequested();
}
}
}, token);
return wrapperTask;
}
}
使用以下代码:
static void Main()
{
LongRunningTaskSource longRunning = new LongRunningTaskSource();
CancellationTokenSource cts = new CancellationTokenSource(1500);
Task task = longRunning.LongRunningTaskWrapper(3000, cts.Token);
//Sleep long enough to let things roll on their own.
Thread.Sleep(5000);
Console.WriteLine("Ended Main");
}
...产生以下输出:
Starting long running task
Aborting Task.
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Finished long running task
Ended Main
包装的任务显然会在自己合适的时间完成。如果您对此没有问题,通常不是这种情况,希望这能满足您的需求。
作为补充示例,运行以下代码(让被包装的任务在超时前完成):
static void Main()
{
LongRunningTaskSource longRunning = new LongRunningTaskSource();
CancellationTokenSource cts = new CancellationTokenSource(3000);
Task task = longRunning.LongRunningTaskWrapper(1500, cts.Token);
//Sleep long enough to let things roll on their own.
Thread.Sleep(5000);
Console.WriteLine("Ended Main");
}
...产生以下输出:
Starting long running task
Finished long running task
Ended Main
因此任务在超时前开始并完成,无需取消任何操作。当然,等待时没有任何阻塞。正如您可能已经知道的那样,当然,如果您 知道 长 运行 代码中在幕后使用的是什么,那么在必要时进行清理会很好。
希望您可以调整此示例以将类似的内容传递给您的 ActionBlock。
免责声明和注意事项
我不熟悉 TPL 数据流库,所以这当然只是一种解决方法。另外,例如,如果您只有一个同步方法调用,您根本没有任何影响,那么您显然需要 两个 任务。一个包装器任务包装同步调用,另一个包装器任务包装包装器任务以包括连续状态轮询和取消检查。
根据 documentation:
A dataflow block is considered completed when it is not currently processing a message and when it has guaranteed that it will not process any more messages.
这种行为对我来说并不理想。我希望能够随时取消作业,但处理每个单独的操作需要很长时间。所以当我取消令牌时,效果不是立竿见影的。我必须等待当前处理的项目完成。我没有办法直接取消操作,因为我使用的 API 是不可取消的。我可以做任何事情让块忽略当前 运行 操作,并立即完成吗?
这里有一个例子可以说明我的问题。令牌在 500 毫秒后取消,每个动作的持续时间为 1000 毫秒:
static async Task Main()
{
var cts = new CancellationTokenSource(500);
var block = new ActionBlock<int>(async x =>
{
await Task.Delay(1000);
}, new ExecutionDataflowBlockOptions() { CancellationToken = cts.Token });
block.Post(1); // I must wait for this one to complete
block.Post(2); // This one is ignored
block.Complete();
var stopwatch = Stopwatch.StartNew();
try
{
await block.Completion;
}
catch (OperationCanceledException)
{
Console.WriteLine($"Canceled after {stopwatch.ElapsedMilliseconds} msec");
}
}
输出:
Canceled after 1035 msec
所需的输出将在 ~500 毫秒后取消。
根据您的评论摘录...:
What I want to happen in case of a cancellation request is to ignore the currently running workitem. I don't care about it any more, so why I have to wait for it?
...假设您 确实 可以离开任务 运行,您可以简单地将要调用的作业包装在 中另一个 Task
将 constantly 轮询取消或完成,并取消 that Task
。看看下面的 "proof-of-concept" 代码,它将一个 "long-running" 任务包装在另一个任务 "tasked" 中,不断轮询包装的任务是否完成,以及一个用于取消的 CancellationToken(完全 "spur-of-the-moment" status,当然你会想要重新调整一下):
public class LongRunningTaskSource
{
public Task LongRunning(int milliseconds)
{
return Task.Run(() =>
{
Console.WriteLine("Starting long running task");
Thread.Sleep(3000);
Console.WriteLine("Finished long running task");
});
}
public Task LongRunningTaskWrapper(int milliseconds, CancellationToken token)
{
Task task = LongRunning(milliseconds);
Task wrapperTask = Task.Run(() =>
{
while (true)
{
//Check for completion (you could, of course, do different things
//depending on whether it is faulted or completed).
if (!(task.Status == TaskStatus.Running))
break;
//Check for cancellation.
if (token.IsCancellationRequested)
{
Console.WriteLine("Aborting Task.");
token.ThrowIfCancellationRequested();
}
}
}, token);
return wrapperTask;
}
}
使用以下代码:
static void Main()
{
LongRunningTaskSource longRunning = new LongRunningTaskSource();
CancellationTokenSource cts = new CancellationTokenSource(1500);
Task task = longRunning.LongRunningTaskWrapper(3000, cts.Token);
//Sleep long enough to let things roll on their own.
Thread.Sleep(5000);
Console.WriteLine("Ended Main");
}
...产生以下输出:
Starting long running task
Aborting Task.
Exception thrown: 'System.OperationCanceledException' in mscorlib.dll
Finished long running task
Ended Main
包装的任务显然会在自己合适的时间完成。如果您对此没有问题,通常不是这种情况,希望这能满足您的需求。
作为补充示例,运行以下代码(让被包装的任务在超时前完成):
static void Main()
{
LongRunningTaskSource longRunning = new LongRunningTaskSource();
CancellationTokenSource cts = new CancellationTokenSource(3000);
Task task = longRunning.LongRunningTaskWrapper(1500, cts.Token);
//Sleep long enough to let things roll on their own.
Thread.Sleep(5000);
Console.WriteLine("Ended Main");
}
...产生以下输出:
Starting long running task
Finished long running task
Ended Main
因此任务在超时前开始并完成,无需取消任何操作。当然,等待时没有任何阻塞。正如您可能已经知道的那样,当然,如果您 知道 长 运行 代码中在幕后使用的是什么,那么在必要时进行清理会很好。
希望您可以调整此示例以将类似的内容传递给您的 ActionBlock。
免责声明和注意事项
我不熟悉 TPL 数据流库,所以这当然只是一种解决方法。另外,例如,如果您只有一个同步方法调用,您根本没有任何影响,那么您显然需要 两个 任务。一个包装器任务包装同步调用,另一个包装器任务包装包装器任务以包括连续状态轮询和取消检查。