具有分离队列的多生产者多消费者数据同步
Multi-Producer Multi-Consumer data synchronization with separated queues
我有以下场景:
- 可变数量(大于三个)的队列(取决于文件中的配置集)
- 这些队列中的一些队列可以提供数据也可以不提供(这取决于通过网络客户端接收数据的生产者:在同一会话期间客户端可以连接也可以不连接)
- 这些队列以不同的速度进给;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
- 这些队列中的对象必须根据所有对象共享的 属性 进行同步(一个 int 属性 不断增加的名称 "SSId")
- 同步必须仅针对在给定时刻被馈送数据的队列发生(必须排除未连接的队列)
- 当对象被同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都关联到一个特定的消费者
- 按照上一步,每个消费者都能够同时处理 "SSId" 具有相同 属性 值的排队对象;
- 因此,最终结果应该是一个系统,即使每个生产者都生成数据,消费者也能够以相同的速率处理数据(根据已经提到的 "SSId" 属性 进行同步)在不同 speeds/rates
为了更清楚地说明前面几点中描述的流程的模式:
请注意,SSid 大于 100 的新项目不会推送到消费者队列,因为其他队列中还没有相应的项目。
您能否建议一种使用 .NET TPL 数据流或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,我希望获得有关如何处理此场景的反馈。
提前感谢您的任何建议。
怎么样
- 将来自所有生产者的对象合并到一个可观察对象中
- 按 SSId 对对象进行分组
- 当组大小等于生产者数量时发出组(通过 .Buffer())
像这样:
var syncedProducers =
// ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
ConnectedProducersEvent
.SelectMany(producers =>
Observable
.Merge(producers) // Put all objects, from all producers into the same observable
.GroupBy(@object => @object.SSId) // Group objects by matching SSId
.SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers
// Now you can wire syncedProducers to consumers
var consumer1 =
syncedProducers
.Select(x => x.Where(y => y.Producer == 1));
我有以下场景:
- 可变数量(大于三个)的队列(取决于文件中的配置集)
- 这些队列中的一些队列可以提供数据也可以不提供(这取决于通过网络客户端接收数据的生产者:在同一会话期间客户端可以连接也可以不连接)
- 这些队列以不同的速度进给;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
- 这些队列中的对象必须根据所有对象共享的 属性 进行同步(一个 int 属性 不断增加的名称 "SSId")
- 同步必须仅针对在给定时刻被馈送数据的队列发生(必须排除未连接的队列)
- 当对象被同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都关联到一个特定的消费者
- 按照上一步,每个消费者都能够同时处理 "SSId" 具有相同 属性 值的排队对象;
- 因此,最终结果应该是一个系统,即使每个生产者都生成数据,消费者也能够以相同的速率处理数据(根据已经提到的 "SSId" 属性 进行同步)在不同 speeds/rates
为了更清楚地说明前面几点中描述的流程的模式:
请注意,SSid 大于 100 的新项目不会推送到消费者队列,因为其他队列中还没有相应的项目。
您能否建议一种使用 .NET TPL 数据流或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,我希望获得有关如何处理此场景的反馈。 提前感谢您的任何建议。
怎么样
- 将来自所有生产者的对象合并到一个可观察对象中
- 按 SSId 对对象进行分组
- 当组大小等于生产者数量时发出组(通过 .Buffer())
像这样:
var syncedProducers =
// ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
ConnectedProducersEvent
.SelectMany(producers =>
Observable
.Merge(producers) // Put all objects, from all producers into the same observable
.GroupBy(@object => @object.SSId) // Group objects by matching SSId
.SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers
// Now you can wire syncedProducers to consumers
var consumer1 =
syncedProducers
.Select(x => x.Where(y => y.Producer == 1));