多个消费者使用相同的值

Multiple consumers working with the same value

我希望得到一些针对某种 producer/consumer 场景的概念性建议。

假设我有一个 "producer" 线程,它通常以相当周期性的方式创建一些整数或双精度值,但两者之间可能存在任意延迟。

现在有几个 "consumer" 线程,一旦值到达,所有线程都应该开始并行处理这个值。

这些 "consumer" 线程中的每一个都可能需要额外的时间来完成其任务。一旦 "consumer" 就绪,它应该等待下一个产生的值。

但是,如果用之前的值完成任务花费的时间太长,消费者应立即继续使用到期的任何值(如果有)。我不需要队列,如果在消费者工作时有几个到达,则可能会跳过值。因此,是否有新值是唯一重要的,如果任何消费者准备好消费下一个值,哪个是最新的。

除了每个 "consumer" 有一个 AutoResetEvent/ManualResetEvent 之外,还有其他可行的方法吗?

具体解决方案应该在Unity3D中工作,所以需要Mono2。

编辑:由于我对概念性建议感兴趣,因此很难提出一些源代码。我希望下面的内容能说明问题。

int data = 0;

producer = new Timer(20, OnTimer);
consumer1 = new Consumer(OnConsume1);
consumer2 = new Consumer(OnConsume2);

OnTimer()
{
        data = data + 20;
        TriggerConsumers();
}

OnConsume1()
{
        while (running)
        {
                WaitForData();
                // do something with data
                Thread.Sleep(10);
        }
}

OnConsume2()
{
        while (running)
        {
                WaitForData();
                // do something with data
                Thread.Sleep(30);
        }
}

生产者每 20 毫秒创建一个新值。然后有两个消费者(以后可能会更多)等待这个值并用它做一些事情。一个消费者需要 10 毫秒,另一个需要 30 毫秒。 如果 producer/consumers 同时开始,这将导致以下时间线:

20 数据 = 20 => OnConsume1,OnConsume2 运行 数据 = 20
30 OnConsume1会等待数据,OnConsume2是"working"
40 数据 = 40 => OnConsume1 运行 数据 = 40,OnConsume2 仍然 "working"
50 OnConsume1 将等待,OnConsume2 将 运行 数据 = 40
60 数据 = 60 => OnConsume1 运行 数据 = 60,OnConsume2 为 "working"
70 OnConsume1 会等待,OnConsume2 仍然"working"
80 data = 80 => OnConsume1 => data = 80,OnConsume2 也应该 运行 与 80(忽略它可能与 60 一起工作的比赛)

听起来如果您想要多个可用值且具有最大深度,您可能需要堆栈/队列。或单个可为 null 的值,例如。

public int? value;

OnConsume1()
{
        while (running)
        {
                int myValue;
                if(value != null) // or value.HasValue
                {
                    myValue = value.Value; // or (int)value
                    value = null; // let other consumers know the value is being processed
                    // do something with data
                }
                Thread.Sleep(10);
        }
}

您需要使用 C# 的 lock 功能来保持一次只有一个线程修改值。

如果要使用线程安全容器,check out these