具有分离队列的多生产者多消费者数据同步

Multi-Producer Multi-Consumer data synchronization with separated queues

我有以下场景:

  1. 可变数量(大于三个)的队列(取决于文件中的配置集)
  2. 这些队列中的一些队列可以提供数据也可以不提供(这取决于通过网络客户端接收数据的生产者:在同一会话期间客户端可以连接也可以不连接)
  3. 这些队列以不同的速度进给;因此,例如,Queue1 在给定时间可以有 10 个对象,而另一个队列 Queue2 在同一给定时间只能有 3 个对象
  4. 这些队列中的对象必须根据所有对象共享的 属性 进行同步(一个 int 属性 不断增加的名称 "SSId")
  5. 同步必须仅针对在给定时刻被馈送数据的队列发生(必须排除未连接的队列)
  6. 当对象被同步时,它们必须被推送到相关消费者使用的相应输出队列:每个生产者都关联到一个特定的消费者
  7. 按照上一步,每个消费者都能够同时处理 "SSId" 具有相同 属性 值的排队对象;
  8. 因此,最终结果应该是一个系统,即使每个生产者都生成数据,消费者也能够以相同的速率处理数据(根据已经提到的 "SSId" 属性 进行同步)在不同 speeds/rates

为了更清楚地说明前面几点中描述的流程的模式:

请注意,SSid 大于 100 的新项目不会推送到消费者队列,因为其他队列中还没有相应的项目。

您能否建议一种使用 .NET TPL 数据流或 Rx.NET 创建这种同步的方法?到目前为止,我一直使用 TPL Dataflow 来实现简单的顺序管道,我希望获得有关如何处理此场景的反馈。 提前感谢您的任何建议。

怎么样

  1. 将来自所有生产者的对象合并到一个可观察对象中
  2. 按 SSId 对对象进行分组
  3. 当组大小等于生产者数量时发出组(通过 .Buffer())

像这样:

var syncedProducers = 
    // ConnectedProducersEvent ticks an array of connected producers, each time a producer connects or disconnects
    ConnectedProducersEvent
        .SelectMany(producers => 
            Observable
                .Merge(producers) // Put all objects, from all producers into the same observable
                .GroupBy(@object => @object.SSId) // Group objects by matching SSId
            .SelectMany(group => group.Buffer(producers.Length))); // Syncing: Emit the SSId group, when the group count matches the count of connected producers

// Now you can wire syncedProducers to consumers
var consumer1 = 
    syncedProducers
        .Select(x => x.Where(y => y.Producer == 1));

You can run the example on dotnetfiddle