TPL 数据流管道中的工作单元问题
The unit of work problem in TPL Dataflow Pipeline
我有一个 classic 生产者消费者 问题,其中多个用户可以同时 POST 数据到网络 API 方法(api/test),异步触发 IO 密集 long 运行 操作。我使用链接到 BufferBlock
.
的 ActionBlock
将并发请求数量限制为 5
Producer
class 被注册为单例,目标是允许对 api/test 的所有调用都进入这个队列。这意味着完成街区之类的事情不是一个选项。
等待控制器完成我启动的工作的最有效方法是什么?
Web API 控制器:
[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
private Producer producer;
public TestController(Producer producer)
{
this.producer = producer;
}
[HttpGet]
public async Task<string[]> Values()
{
for (int i = 1; i <= 10; i++)
{
await this.producer.AddAsync(1);
}
// i've added my work to the queue, elegant completion required
return new string[] { "value1", "value2" };
}
}
生产者/消费者实施:
public class Producer
{
private BufferBlock<int> queue;
private ActionBlock<int> consumer;
public List<int> results = new List<int>();
private void InitializeChain()
{
queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
consumer = new ActionBlock<int>(x =>
{
Thread.Sleep(5000);
Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
results.Add(x);
}, consumerOptions);
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task AddAsync(int data)
{
await queue.SendAsync(data);
}
public Producer()
{
this.InitializeChain();
}
}
所以有很多方法和同步原语可以用来解决这个问题,每个都有自己的好处、容错和问题,具体取决于您的需要。这是一个 awaitable 示例 TaskCompletionSource
给定
public class Producer
{
private BufferBlock<int> _queue;
private ActionBlock<int> _consumer;
public Action<int,string> OnResult;
public Producer()
{
InitializeChain();
}
private void InitializeChain()
{
_queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
_consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);
_queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
private async Task SomeIoWorkAsync(int x)
{
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
await Task.Delay(5000);
OnResult?.Invoke(x,$"SomeResult {x}");
}
public Task AddAsync(int data) => _queue.SendAsync(data);
}
等待
您可以轻松地将其重构为在一个调用中完成发送和等待。
public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
var tcs = new TaskCompletionSource<string>();
producer.OnResult += (id,result) =>
{
if(id == myId)
tcs.TrySetResult(result);
};
return tcs.Task;
}
用法
var producer = new Producer();
// to simulate something you are waiting for, and id or what ever
var myId = 7;
// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);
// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
await producer.AddAsync(i);
});
// wait for your results to pop out
var result = await task;
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");
// you can happily end here, the pipeline will keep going
Console.ReadKey();
输出
12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20
注意 :您可能需要玩这个例子,这样它就不会超时
一次完成所有操作的示例
public async Task<string> AddAsync(int data)
{
await _queue.SendAsync(data);
return await WaitForConsumerAsync(data);
}
public Task<string> WaitForConsumerAsync(int data)
{
var tcs = new TaskCompletionSource<string>();
OnResult += (id, result) =>
{
if (id == data)
tcs.TrySetResult(result);
};
return tcs.Task;
}
补充说明
这实际上只是一个 awaitable
事件的学术示例。我假设您的管道比给出的示例更复杂,并且您正在组合 CPU 和 IO 绑定工作负载,此外您实际上需要 BufferBlock
在这个例子中它是多余的。
- 如果您正在等待纯 IO 工作负载,您最好只等待它们,不需要管道。
- 根据您提供的信息,除非您有某种内存限制,否则没有必要使用
BoundedCapacity
创建背压。
- 您需要小心
BoundedCapacity
和默认值 EnsureOrdered = true
。使用 EnsureOrdered = false
,管道将更加高效。作业将在完成时 pop-out 并且 背压 不会受到不同结果排序的影响,这意味着项目可能会更快地通过管道
- 你也可以使用其他框架,比如 RX,这可能会让这一切变得更加优雅和流畅
- 你也可以通过设置
SingleProducerConstrained = true
来提高效率,因为你的块是线性的
我有一个 classic 生产者消费者 问题,其中多个用户可以同时 POST 数据到网络 API 方法(api/test),异步触发 IO 密集 long 运行 操作。我使用链接到 BufferBlock
.
ActionBlock
将并发请求数量限制为 5
Producer
class 被注册为单例,目标是允许对 api/test 的所有调用都进入这个队列。这意味着完成街区之类的事情不是一个选项。
等待控制器完成我启动的工作的最有效方法是什么?
Web API 控制器:
[Route("api/test")]
[ApiController]
public class TestController : ControllerBase
{
private Producer producer;
public TestController(Producer producer)
{
this.producer = producer;
}
[HttpGet]
public async Task<string[]> Values()
{
for (int i = 1; i <= 10; i++)
{
await this.producer.AddAsync(1);
}
// i've added my work to the queue, elegant completion required
return new string[] { "value1", "value2" };
}
}
生产者/消费者实施:
public class Producer
{
private BufferBlock<int> queue;
private ActionBlock<int> consumer;
public List<int> results = new List<int>();
private void InitializeChain()
{
queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
consumer = new ActionBlock<int>(x =>
{
Thread.Sleep(5000);
Debug.WriteLine(x + " " + Thread.CurrentThread.ManagedThreadId);
results.Add(x);
}, consumerOptions);
queue.LinkTo(consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
public async Task AddAsync(int data)
{
await queue.SendAsync(data);
}
public Producer()
{
this.InitializeChain();
}
}
所以有很多方法和同步原语可以用来解决这个问题,每个都有自己的好处、容错和问题,具体取决于您的需要。这是一个 awaitable 示例 TaskCompletionSource
给定
public class Producer
{
private BufferBlock<int> _queue;
private ActionBlock<int> _consumer;
public Action<int,string> OnResult;
public Producer()
{
InitializeChain();
}
private void InitializeChain()
{
_queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var consumerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 5, MaxDegreeOfParallelism = 5 };
_consumer = new ActionBlock<int>(SomeIoWorkAsync, consumerOptions);
_queue.LinkTo(_consumer, new DataflowLinkOptions { PropagateCompletion = true });
}
private async Task SomeIoWorkAsync(int x)
{
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Processing {x}");
await Task.Delay(5000);
OnResult?.Invoke(x,$"SomeResult {x}");
}
public Task AddAsync(int data) => _queue.SendAsync(data);
}
等待
您可以轻松地将其重构为在一个调用中完成发送和等待。
public static Task<string> WaitForConsumerAsync(Producer producer,int myId)
{
var tcs = new TaskCompletionSource<string>();
producer.OnResult += (id,result) =>
{
if(id == myId)
tcs.TrySetResult(result);
};
return tcs.Task;
}
用法
var producer = new Producer();
// to simulate something you are waiting for, and id or what ever
var myId = 7;
// you could send and await in the same method if needed. this is just an example
var task = WaitForConsumerAsync(producer,myId);
// create random work for the bounded capacity to fill up
// run this as a task so we don't hit the back pressure before we await (just for this test)
Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
await producer.AddAsync(i);
});
// wait for your results to pop out
var result = await task;
Console.WriteLine($"{DateTime.Now.TimeOfDay:g} : Got my result {result}, now i can finish happily");
// you can happily end here, the pipeline will keep going
Console.ReadKey();
输出
12:04:41.62464 : Processing 3
12:04:41.6246489 : Processing 1
12:04:41.6246682 : Processing 2
12:04:41.624641 : Processing 4
12:04:41.624661 : Processing 5
12:04:41.8530723 : Processing 7
12:04:41.8530791 : Processing 8
12:04:41.8531427 : Processing 10
12:04:41.8530716 : Processing 6
12:04:41.8530967 : Processing 9
12:04:42.0531947 : Got my result SomeResult 7, now i can finish happily
12:04:42.0532178 : Processing 11
12:04:42.0532453 : Processing 12
12:04:42.0532721 : Processing 14
12:04:42.0532533 : Processing 13
12:04:42.2674406 : Processing 15
12:04:42.2709914 : Processing 16
12:04:42.2713017 : Processing 18
12:04:42.2710417 : Processing 17
12:04:42.4689852 : Processing 19
12:04:42.4721405 : Processing 20
注意 :您可能需要玩这个例子,这样它就不会超时
一次完成所有操作的示例
public async Task<string> AddAsync(int data)
{
await _queue.SendAsync(data);
return await WaitForConsumerAsync(data);
}
public Task<string> WaitForConsumerAsync(int data)
{
var tcs = new TaskCompletionSource<string>();
OnResult += (id, result) =>
{
if (id == data)
tcs.TrySetResult(result);
};
return tcs.Task;
}
补充说明
这实际上只是一个 awaitable
事件的学术示例。我假设您的管道比给出的示例更复杂,并且您正在组合 CPU 和 IO 绑定工作负载,此外您实际上需要 BufferBlock
在这个例子中它是多余的。
- 如果您正在等待纯 IO 工作负载,您最好只等待它们,不需要管道。
- 根据您提供的信息,除非您有某种内存限制,否则没有必要使用
BoundedCapacity
创建背压。 - 您需要小心
BoundedCapacity
和默认值EnsureOrdered = true
。使用EnsureOrdered = false
,管道将更加高效。作业将在完成时 pop-out 并且 背压 不会受到不同结果排序的影响,这意味着项目可能会更快地通过管道 - 你也可以使用其他框架,比如 RX,这可能会让这一切变得更加优雅和流畅
- 你也可以通过设置
SingleProducerConstrained = true
来提高效率,因为你的块是线性的