TPL 数据流:如何限制整个管道?
TPL Dataflow : How to throttle an entire pipeline?
我想限制在 Dataflow 管道中 posted 的项目数。项目的数量取决于生产环境。
这些对象消耗大量内存(图像),因此我想在管道的最后一个块完成其工作时 post 它们。
我尝试使用 SemaphoreSlim
来限制生产者并在管道的最后一个块中释放它。可以,但是如果过程中抛出异常,程序会一直等待,异常不会被拦截。
这是一个看起来像我们的代码的示例。
我怎样才能做到这一点 ?
static void Main(string[] args)
{
SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);
var downloadString = new TransformBlock<string, string>(uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return new WebClient().DownloadString(uri);
});
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
return text.Split(new char[] { ' ' },
StringSplitOptions.RemoveEmptyEntries);
});
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
throw new InvalidOperationException("ouch !"); // explicit for test
return words.Where(word => word.Length > 3).OrderBy(word => word)
.Distinct().ToArray();
});
var findPalindromes = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Finding palindromes...");
var palindromes = new ConcurrentQueue<string>();
Parallel.ForEach(words, word =>
{
string reverse = new string(word.Reverse().ToArray());
if (Array.BinarySearch<string>(words, reverse) >= 0 &&
word != reverse)
{
palindromes.Enqueue(word);
}
});
return palindromes.ToArray();
});
var printPalindrome = new ActionBlock<string[]>(palindromes =>
{
try
{
foreach (string palindrome in palindromes)
{
Console.WriteLine("Found palindrome {0}/{1}",
palindrome, new string(palindrome.Reverse().ToArray()));
}
}
finally
{
semaphore.Release();
}
});
downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);
downloadString.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)createWordList).Fault(t.Exception);
else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)filterWordList).Fault(t.Exception);
else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
else printPalindrome.Complete();
});
try
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
semaphore.Wait(); // waits here when an exception throws
}
downloadString.Complete();
printPalindrome.Completion.Wait();
}
catch (AggregateException agg)
{
Console.WriteLine("An error has occured : " + agg);
}
Console.WriteLine("Done");
Console.ReadKey();
}
您应该同时等待信号量和完成任务。这样,如果块过早结束(通过异常或取消),那么异常将被重新抛出,否则你将等待你的信号量,直到有空间 post 更多。
您可以使用 Task.WhenAny
和 SemaphoreSlim.WaitAsync
:
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
if (printPalindrome.Completion.IsCompleted)
{
break;
}
Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}
注意:使用 Task.Wait
仅适用于这种情况,因为它是 Main
。通常这应该是一个 async
方法,你应该 await
从 Task.WhenAny
.
返回的任务
这就是我处理限制或在任何时候只允许源块中有 10 个项目的方式。您可以将其修改为 1。确保您还限制了管道中的任何其他块,否则,您可以获得 1 的源块和更多的下一个块。
var sourceBlock = new BufferBlock<string>(
new ExecutionDataflowBlockOptions() {
SingleProducerConstrained = true,
BoundedCapacity = 10 });
然后生产者这样做:
sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);
如果您使用异步/等待,只需等待 SendAsync 调用。
我想限制在 Dataflow 管道中 posted 的项目数。项目的数量取决于生产环境。 这些对象消耗大量内存(图像),因此我想在管道的最后一个块完成其工作时 post 它们。
我尝试使用 SemaphoreSlim
来限制生产者并在管道的最后一个块中释放它。可以,但是如果过程中抛出异常,程序会一直等待,异常不会被拦截。
这是一个看起来像我们的代码的示例。 我怎样才能做到这一点 ?
static void Main(string[] args)
{
SemaphoreSlim semaphore = new SemaphoreSlim(1, 2);
var downloadString = new TransformBlock<string, string>(uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return new WebClient().DownloadString(uri);
});
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
char[] tokens = text.ToArray();
for (int i = 0; i < tokens.Length; i++)
{
if (!char.IsLetter(tokens[i]))
tokens[i] = ' ';
}
text = new string(tokens);
return text.Split(new char[] { ' ' },
StringSplitOptions.RemoveEmptyEntries);
});
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
throw new InvalidOperationException("ouch !"); // explicit for test
return words.Where(word => word.Length > 3).OrderBy(word => word)
.Distinct().ToArray();
});
var findPalindromes = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Finding palindromes...");
var palindromes = new ConcurrentQueue<string>();
Parallel.ForEach(words, word =>
{
string reverse = new string(word.Reverse().ToArray());
if (Array.BinarySearch<string>(words, reverse) >= 0 &&
word != reverse)
{
palindromes.Enqueue(word);
}
});
return palindromes.ToArray();
});
var printPalindrome = new ActionBlock<string[]>(palindromes =>
{
try
{
foreach (string palindrome in palindromes)
{
Console.WriteLine("Found palindrome {0}/{1}",
palindrome, new string(palindrome.Reverse().ToArray()));
}
}
finally
{
semaphore.Release();
}
});
downloadString.LinkTo(createWordList);
createWordList.LinkTo(filterWordList);
filterWordList.LinkTo(findPalindromes);
findPalindromes.LinkTo(printPalindrome);
downloadString.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)createWordList).Fault(t.Exception);
else createWordList.Complete();
});
createWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)filterWordList).Fault(t.Exception);
else filterWordList.Complete();
});
filterWordList.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)findPalindromes).Fault(t.Exception); // enter here when an exception throws
else findPalindromes.Complete();
});
findPalindromes.Completion.ContinueWith(t =>
{
if (t.IsFaulted)
((IDataflowBlock)printPalindrome).Fault(t.Exception); // the fault is propagated here but not catched
else printPalindrome.Complete();
});
try
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
semaphore.Wait(); // waits here when an exception throws
}
downloadString.Complete();
printPalindrome.Completion.Wait();
}
catch (AggregateException agg)
{
Console.WriteLine("An error has occured : " + agg);
}
Console.WriteLine("Done");
Console.ReadKey();
}
您应该同时等待信号量和完成任务。这样,如果块过早结束(通过异常或取消),那么异常将被重新抛出,否则你将等待你的信号量,直到有空间 post 更多。
您可以使用 Task.WhenAny
和 SemaphoreSlim.WaitAsync
:
for (int i = 0; i < 10; i++)
{
Console.WriteLine(i);
downloadString.Post("http://www.google.com");
if (printPalindrome.Completion.IsCompleted)
{
break;
}
Task.WhenAny(semaphore.WaitAsync(), printPalindrome.Completion).Wait();
}
注意:使用 Task.Wait
仅适用于这种情况,因为它是 Main
。通常这应该是一个 async
方法,你应该 await
从 Task.WhenAny
.
这就是我处理限制或在任何时候只允许源块中有 10 个项目的方式。您可以将其修改为 1。确保您还限制了管道中的任何其他块,否则,您可以获得 1 的源块和更多的下一个块。
var sourceBlock = new BufferBlock<string>(
new ExecutionDataflowBlockOptions() {
SingleProducerConstrained = true,
BoundedCapacity = 10 });
然后生产者这样做:
sourceBlock.SendAsync("value", shutdownToken).Wait(shutdownToken);
如果您使用异步/等待,只需等待 SendAsync 调用。