识别 TPL 数据流中的同时任务
identifiing the simultaneous tasks in a TPL dataflow
我在 TPL 数据流块中有 1000 个元素,
每个元素都会调用外部网络服务。
网络服务最多支持10个同时调用,
使用以下方法很容易实现:
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
...
}
Web 服务要求每个调用都传递一个唯一的 ID,以区别于其他同时调用。
理论上这应该是一个 GUID,但实际上第 11 个 GUID 将失败 - 因为服务器上的节流机制识别第一个调用已完成的速度很慢。
供应商建议我们回收 guid,保留 10 个在使用中。
我打算有一个 GUIDS 数组,每个任务将使用 (Interlocked.Increment(ref COUNTER) % 10 ) 作为数组索引
编辑:
我才意识到这行不通!
它假定任务将按顺序完成,但它们可能不会
我可以将其实现为一个 ID 队列,其中每个任务借用一个 returns,但问题仍然存在,是否有更简单的预构建线程安全方法来执行此操作?
(永远不会有足够的调用使 COUNTER 溢出)
但 C#(我是 .net 的新手)让我多次感到惊讶,因为我正在实现一些已经存在的东西。
是否有更好的线程安全方式让每个任务从 ID 池中回收?
创建资源池正是 System.Collections.ConcurrentBag<T>
适用的情况。将其包装在 BlockingCollection<T>
中以使代码更简单。
class Example
{
private readonly BlockingCollection<Guid> _guidPool;
private readonly TransformBlock<Foo, Bar> _transform;
public Example(int concurrentLimit)
{
_guidPool = new BlockingCollection<Guid>(new ConcurrentBag<Guid>(), concurrentLimit)
for(int i = 0: i < concurrentLimit; i++)
{
_guidPool.Add(Guid.NewGuid());
}
_transform = new TransformBlock<Foo, Bar>(() => SomeAction,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = concurrentLimit
//...
});
//...
}
private async Task<Bar> SomeAction(Foo foo)
{
var id= _guidPool.Take();
try
{
//...
}
finally
{
_guidPool.Add(id);
}
}
}
我在 TPL 数据流块中有 1000 个元素, 每个元素都会调用外部网络服务。
网络服务最多支持10个同时调用, 使用以下方法很容易实现:
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
...
}
Web 服务要求每个调用都传递一个唯一的 ID,以区别于其他同时调用。 理论上这应该是一个 GUID,但实际上第 11 个 GUID 将失败 - 因为服务器上的节流机制识别第一个调用已完成的速度很慢。
供应商建议我们回收 guid,保留 10 个在使用中。
我打算有一个 GUIDS 数组,每个任务将使用 (Interlocked.Increment(ref COUNTER) % 10 ) 作为数组索引
编辑: 我才意识到这行不通! 它假定任务将按顺序完成,但它们可能不会 我可以将其实现为一个 ID 队列,其中每个任务借用一个 returns,但问题仍然存在,是否有更简单的预构建线程安全方法来执行此操作?
(永远不会有足够的调用使 COUNTER 溢出)
但 C#(我是 .net 的新手)让我多次感到惊讶,因为我正在实现一些已经存在的东西。
是否有更好的线程安全方式让每个任务从 ID 池中回收?
创建资源池正是 System.Collections.ConcurrentBag<T>
适用的情况。将其包装在 BlockingCollection<T>
中以使代码更简单。
class Example
{
private readonly BlockingCollection<Guid> _guidPool;
private readonly TransformBlock<Foo, Bar> _transform;
public Example(int concurrentLimit)
{
_guidPool = new BlockingCollection<Guid>(new ConcurrentBag<Guid>(), concurrentLimit)
for(int i = 0: i < concurrentLimit; i++)
{
_guidPool.Add(Guid.NewGuid());
}
_transform = new TransformBlock<Foo, Bar>(() => SomeAction,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = concurrentLimit
//...
});
//...
}
private async Task<Bar> SomeAction(Foo foo)
{
var id= _guidPool.Take();
try
{
//...
}
finally
{
_guidPool.Add(id);
}
}
}