TransformBlock Items 卡在输出队列中。为什么以及如何解决?
TransformBlock Items get stuck in the output queue. Why and how to fix?
我已经了解了 TPL DataFlow,并且在使用 TrasformBlock
链接到 ActionBlock
的代码中遇到了一个非常恼人的问题。
最终我发现项目卡在 TransformBlock
的输出队列中,因为它的 OutputCount
属性 不断返回高于“0”的值。
这就是整个应用程序陷入僵局的原因。但是,一旦我调用 TransformBlock.TryReceiveAll()
.
,它就会解锁
任何人都可以告诉我是否遗漏了什么或如何防止此类行为?
static void Main()
{
int total = 0;
int itemsProcessing = 0;
TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
i => new Tuple<int, double>(i, Math.Sqrt(i)),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 20,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
{
await Task.Delay(1000); // simulating data output delay
Interlocked.Decrement(ref itemsProcessing);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
transformBlock.Completion.ContinueWith(t => outputBlock.Complete());
using (Timer timer = new Timer(o =>
{
Console.Title = string.Format(
"{0}: {1}/{2} {3}/{4}/{5}",
Assembly.GetExecutingAssembly().GetName().Name,
Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
}, null, 100, 100))
{
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
}
}
调用 TransformBlock.LinkTo
可让您重新获得一次性注册。当您处理该注册时,块将取消链接。
您的 using
作用域结束得太早,并且块在 TransformBlock
有机会将自身清空到 ActionBlock
之前取消链接,从而阻止它完成。由于第一个块没有完成,下一个块甚至都没有开始完成,更不用说完成了。
将等待移动到 using
块内解决了死锁:
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
}
附带说明一下,您真的不应该以这种方式阻塞异步代码。使用 async-await 代替 Wait()
、Task.Delay
代替 Thread.Sleep
等会简单得多
此外,由于您使用的是 PropagateCompletion
,因此无需显式调用 outputBlock.Complete()
。
我已经了解了 TPL DataFlow,并且在使用 TrasformBlock
链接到 ActionBlock
的代码中遇到了一个非常恼人的问题。
最终我发现项目卡在 TransformBlock
的输出队列中,因为它的 OutputCount
属性 不断返回高于“0”的值。
这就是整个应用程序陷入僵局的原因。但是,一旦我调用 TransformBlock.TryReceiveAll()
.
任何人都可以告诉我是否遗漏了什么或如何防止此类行为?
static void Main()
{
int total = 0;
int itemsProcessing = 0;
TransformBlock<int, Tuple<int, double>> transformBlock = new TransformBlock<int, Tuple<int, double>>(
i => new Tuple<int, double>(i, Math.Sqrt(i)),
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 20,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
ActionBlock<Tuple<int, double>> outputBlock = new ActionBlock<Tuple<int, double>>(async tuple =>
{
await Task.Delay(1000); // simulating data output delay
Interlocked.Decrement(ref itemsProcessing);
},
new ExecutionDataflowBlockOptions
{
BoundedCapacity = 5,
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
transformBlock.Completion.ContinueWith(t => outputBlock.Complete());
using (Timer timer = new Timer(o =>
{
Console.Title = string.Format(
"{0}: {1}/{2} {3}/{4}/{5}",
Assembly.GetExecutingAssembly().GetName().Name,
Volatile.Read(ref itemsProcessing), Volatile.Read(ref total),
transformBlock.InputCount, transformBlock.OutputCount, outputBlock.InputCount);
}, null, 100, 100))
{
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
timer.Change(Timeout.Infinite, Timeout.Infinite);
timer.Dispose();
}
}
调用 TransformBlock.LinkTo
可让您重新获得一次性注册。当您处理该注册时,块将取消链接。
您的 using
作用域结束得太早,并且块在 TransformBlock
有机会将自身清空到 ActionBlock
之前取消链接,从而阻止它完成。由于第一个块没有完成,下一个块甚至都没有开始完成,更不用说完成了。
将等待移动到 using
块内解决了死锁:
using (transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 40; i++)
{
Thread.Sleep(100); // simulating new item retrieval delay
Interlocked.Increment(ref total);
Interlocked.Increment(ref itemsProcessing);
transformBlock.SendAsync(i).Wait();
}
Console.WriteLine("Enqueued");
transformBlock.Complete();
outputBlock.Completion.Wait();
Console.WriteLine("Finish");
}
附带说明一下,您真的不应该以这种方式阻塞异步代码。使用 async-await 代替 Wait()
、Task.Delay
代替 Thread.Sleep
等会简单得多
此外,由于您使用的是 PropagateCompletion
,因此无需显式调用 outputBlock.Complete()
。