TPL.Dataflow - 防止在 ActionBlock 中发生未处理的异常时挂起<T>
TPL.Dataflow - preventing hangs when unhanded exception occurs in ActionBlock<T>
刚从 System.Threading.Tasks.Dataflow
开始,不确定我是否理解 ActionBlock
中未处理异常的正确错误处理技术。
我现在有什么导致挂起:
- ActionBlock
有未处理的异常,不再处理
- 生产者无法完成,因为它已经结束 BoundedCapacity
这是我的代码(已简化以显示一位消费者)。
internal class Program
{
private static int _processCounter = 0;
internal class MyClass
{
public MyClass(int id)
{
this.Id = id;
}
internal int Id { get; set; }
}
private static void Main(string[] args)
{
BufferBlock<MyClass> queue = new BufferBlock<MyClass>(new DataflowBlockOptions {BoundedCapacity = 10,});
ActionBlock<MyClass> consumer =
new ActionBlock<MyClass>(record => Process(record),
new ExecutionDataflowBlockOptions {BoundedCapacity = 1,});
queue.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true,});
Task producer = Produce(queue);
Trace.TraceInformation("Starting to wait on producer and consumer...");
Task.WhenAll(producer, consumer.Completion).Wait(); // <-- this will hang. consumer.Completion is faulted, but producer is still "running".
}
private static async Task Produce(BufferBlock<MyClass> queue)
{
for (int i = 0; i < 20; i++)
{
await queue.SendAsync(new MyClass(i));
Trace.TraceInformation("Sending object number {0}", i);
await Task.Delay(1);
}
Trace.TraceInformation("Completing the producer");
queue.Complete();
// <-- we never get here because one of the SendAsync will be waiting to not excede BoundedCapacity = 10
}
private static void Process(MyClass myClass)
{
int counter = Interlocked.Increment(ref _processCounter);
Trace.TraceInformation("Processing object number {0}", myClass.Id);
if (counter > 4)
{
Trace.TraceInformation("About to throw exception for object {0}", myClass.Id);
throw new ArgumentException("Something bad happened");
}
}
}
输出:
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 0
ConsoleApplication5.vshost.exe Information: 0 : Starting to wait on producer and consumer...
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 1
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 0
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 2
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 3
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 1
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 4
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 2
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 3
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 5
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 4
ConsoleApplication5.vshost.exe Information: 0 : About to throw exception for object 4
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 6
A first chance exception of type 'System.ArgumentException' occurred in ConsoleApplication5.exe
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 7
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 8
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 9
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 10
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 11
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 12
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 13
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 14
<never finishes>
问题是,等待此类执行以确保其完成或传播异常的正确方法是什么。
谢谢!
您可以做很多事情,因为这与您如何构建代码有关。最简单的可能是对生产者使用CancellationToken
并先等待消费者:
private static void Main(string[] args)
{
// ...
var cts = new CancellationTokenSource();
Task producer = Produce(queue, cts.Token);
Trace.TraceInformation("Starting to wait on producer and consumer...");
try
{
await consumer.Completion;
}
catch
{
cts.Cancel();
// handle
}
try
{
await producer
}
catch
{
// handle
}
}
private static async Task Produce(BufferBlock<MyClass> queue, CancellationToken token)
{
for (int i = 0; i < 20; i++)
{
await queue.SendAsync(new MyClass(i), token);
Trace.TraceInformation("Sending object number {0}", i);
await Task.Delay(1);
}
Trace.TraceInformation("Completing the producer");
queue.Complete();
}
刚从 System.Threading.Tasks.Dataflow
开始,不确定我是否理解 ActionBlock
中未处理异常的正确错误处理技术。
我现在有什么导致挂起:
- ActionBlock
有未处理的异常,不再处理
- 生产者无法完成,因为它已经结束 BoundedCapacity
这是我的代码(已简化以显示一位消费者)。
internal class Program
{
private static int _processCounter = 0;
internal class MyClass
{
public MyClass(int id)
{
this.Id = id;
}
internal int Id { get; set; }
}
private static void Main(string[] args)
{
BufferBlock<MyClass> queue = new BufferBlock<MyClass>(new DataflowBlockOptions {BoundedCapacity = 10,});
ActionBlock<MyClass> consumer =
new ActionBlock<MyClass>(record => Process(record),
new ExecutionDataflowBlockOptions {BoundedCapacity = 1,});
queue.LinkTo(consumer, new DataflowLinkOptions {PropagateCompletion = true,});
Task producer = Produce(queue);
Trace.TraceInformation("Starting to wait on producer and consumer...");
Task.WhenAll(producer, consumer.Completion).Wait(); // <-- this will hang. consumer.Completion is faulted, but producer is still "running".
}
private static async Task Produce(BufferBlock<MyClass> queue)
{
for (int i = 0; i < 20; i++)
{
await queue.SendAsync(new MyClass(i));
Trace.TraceInformation("Sending object number {0}", i);
await Task.Delay(1);
}
Trace.TraceInformation("Completing the producer");
queue.Complete();
// <-- we never get here because one of the SendAsync will be waiting to not excede BoundedCapacity = 10
}
private static void Process(MyClass myClass)
{
int counter = Interlocked.Increment(ref _processCounter);
Trace.TraceInformation("Processing object number {0}", myClass.Id);
if (counter > 4)
{
Trace.TraceInformation("About to throw exception for object {0}", myClass.Id);
throw new ArgumentException("Something bad happened");
}
}
}
输出:
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 0
ConsoleApplication5.vshost.exe Information: 0 : Starting to wait on producer and consumer...
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 1
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 0
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 2
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 3
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 1
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 4
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 2
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 3
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 5
ConsoleApplication5.vshost.exe Information: 0 : Processing object number 4
ConsoleApplication5.vshost.exe Information: 0 : About to throw exception for object 4
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 6
A first chance exception of type 'System.ArgumentException' occurred in ConsoleApplication5.exe
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 7
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 8
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 9
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 10
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 11
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 12
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 13
ConsoleApplication5.vshost.exe Information: 0 : Sending object number 14
<never finishes>
问题是,等待此类执行以确保其完成或传播异常的正确方法是什么。 谢谢!
您可以做很多事情,因为这与您如何构建代码有关。最简单的可能是对生产者使用CancellationToken
并先等待消费者:
private static void Main(string[] args)
{
// ...
var cts = new CancellationTokenSource();
Task producer = Produce(queue, cts.Token);
Trace.TraceInformation("Starting to wait on producer and consumer...");
try
{
await consumer.Completion;
}
catch
{
cts.Cancel();
// handle
}
try
{
await producer
}
catch
{
// handle
}
}
private static async Task Produce(BufferBlock<MyClass> queue, CancellationToken token)
{
for (int i = 0; i < 20; i++)
{
await queue.SendAsync(new MyClass(i), token);
Trace.TraceInformation("Sending object number {0}", i);
await Task.Delay(1);
}
Trace.TraceInformation("Completing the producer");
queue.Complete();
}