TPL DataFlow 无法处理 ActionBlock 中的异常
TPL DataFlow unable to handle an exception in ActionBlock
我正在尝试从 ActionBlock<int>
向多个同样是 ActionBlock<int>
的消费者发送一条消息的副本。这很好用,但是如果其中一个目标块抛出异常,这似乎不会传播到源块。在这里,我如何尝试处理异常,但它永远不会进入 catch
部分:
static void Main(string[] args)
{
var t1 = new ActionBlock<int>(async i =>
{
await Task.Delay(2000);
Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t2 = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t3 = new ActionBlock<int>(async i =>
{
await Task.Delay(100);
Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
if (i > 5)
throw new Exception("Too big number");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var targets = new [] { t1, t2, t3};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
await t.SendAsync(item);
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});
try
{
await Task.WhenAll(processingTasks);
}
catch
{
Trace.TraceInformation("handled"); // never goes here
}
});
for (var i = 1; i <= 10; i++)
broadcaster.Post(i);
}
我不确定我在这里遗漏了什么,但我希望能够检索异常以及哪个目标块出现故障。
如果块进入故障状态,它将不再接受新项目,并且它抛出的 Exception
将附加到它的 Completion
任务 and/or 如果链接完成则传播在管道中。要观察 Exception
如果该块拒绝更多项目,您可以 await
完成。
var processingTasks = targets.Select(async t =>
{
try
{
if(!await t.SendAsync(item))
await t.Completion;
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});
我正在尝试从 ActionBlock<int>
向多个同样是 ActionBlock<int>
的消费者发送一条消息的副本。这很好用,但是如果其中一个目标块抛出异常,这似乎不会传播到源块。在这里,我如何尝试处理异常,但它永远不会进入 catch
部分:
static void Main(string[] args)
{
var t1 = new ActionBlock<int>(async i =>
{
await Task.Delay(2000);
Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t2 = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t3 = new ActionBlock<int>(async i =>
{
await Task.Delay(100);
Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
if (i > 5)
throw new Exception("Too big number");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var targets = new [] { t1, t2, t3};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
await t.SendAsync(item);
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});
try
{
await Task.WhenAll(processingTasks);
}
catch
{
Trace.TraceInformation("handled"); // never goes here
}
});
for (var i = 1; i <= 10; i++)
broadcaster.Post(i);
}
我不确定我在这里遗漏了什么,但我希望能够检索异常以及哪个目标块出现故障。
如果块进入故障状态,它将不再接受新项目,并且它抛出的 Exception
将附加到它的 Completion
任务 and/or 如果链接完成则传播在管道中。要观察 Exception
如果该块拒绝更多项目,您可以 await
完成。
var processingTasks = targets.Select(async t =>
{
try
{
if(!await t.SendAsync(item))
await t.Completion;
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});