Observable 不对队列在不同线程上更改做出反应

Observable not reacting to queue changed on different thread

我有以下代码:

static void Main()
    {
        var holderQueue = new ConcurrentQueue<int>(GetInitialElements());

        Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
        var observableQueue = holderQueue.ToObservable();
        IScheduler newThreadScheduler = new NewThreadScheduler();

        IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();

        var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);

        var t = new TaskFactory();
        t.StartNew(() => addToQueueAction(holderQueue));

        using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
        {
            Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
            foreach(var item in currentQueue)
                Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);

            Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
        }))
        {
            Console.WriteLine("started observing queue");

            Console.ReadLine();
        }
    }

    private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
    {
        while(true)
        {
            var x = new Random().Next(1, 10);
            concurrentQueue.Enqueue(x);
            Console.WriteLine("added {0}", x);
            Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
            Thread.Sleep(1000);
        }
    }

    private static IEnumerable<int> GetInitialElements()
    {
        var random = new Random();
        var items = new List<int>();
        for (int i = 0; i < 10; i++)
            items.Add(random.Next(1, 10));

        return items;
    }

意向如下:

holderQueue 对象最初由一些元素 (GetInitialElements) 填充,然后在另一个线程上用更多元素进行更改(通过 AddToQueue 方法),可观察到的是应该检测到这种变化,并在它的时间过去时(所以每 3 秒)通过在其订阅中执行该方法做出相应的反应。

所以简而言之,我期望 Subscribe 主体中的代码每 3 秒执行一次,并向我显示队列中的更改(在不同的线程上更改)。相反 Subscribe 主体只执行一次。为什么?

谢谢

ToObservable 方法采用 IEnumerable<T> 并将其转换为可观察对象。结果,它将获取并发队列并立即枚举它,运行 遍历所有可用项目。您稍后修改队列以添加其他项目这一事实对从并发队列的 GetEnumerator() 实现返回的已枚举 IEnumerable<T> 没有影响。

根据 David Pfeffer 的回答,仅使用 .ToObserverable() 无法满足您的需求。

但是,当我查看您的代码时,我看到了几件事:

  1. 您正在使用 NewThreadScheduler
  2. 您正在通过任务添加到队列
  3. 您正在使用 ConcurrentQueue<T>

我认为只要改变一些地方就可以实现你在这里设定的目标。首先,我认为您实际上是在寻找 BlockingCollection<T>。我知道这似乎不太可能,但您可以让它像线程安全队列一样工作。

接下来您已经将一个线程专用于对 NewThreadScheduler 执行某些操作,为什么不让队列中的 polling/pulling 执行此操作?

最后,如果你使用了BlockingCollection<T>.GetConsumingEnumerable(CancellationToken)方法,其实你可以回去使用那个.ToObservable()方法!

那么让我们看看重写后的代码:

static void Main()
{
    //The processing thread. I try to set the the thread name as these tend to be long lived. This helps logs and debugging.
    IScheduler newThreadScheduler = new NewThreadScheduler(ts=>{
        var t =  new Thread(ts);
        t.Name = "QueueReader";
        t.IsBackground = true;
        return t;
    });

    //Provide the ability to cancel our work
    var cts = new CancellationTokenSource();

    //Use a BlockingCollection<T> instead of a ConcurrentQueue<T>
    var holderQueue = new BlockingCollection<int>();
    foreach (var element in GetInitialElements())
    {
        holderQueue.Add(element);
    }

    //The Action that periodically adds items to the queue. Now has cancellation support
    Action<BlockingCollection<int>,CancellationToken> addToQueueAction = AddToQueue;
    var tf = new TaskFactory();
    tf.StartNew(() => addToQueueAction(holderQueue, cts.Token));

    //Get a consuming enumerable. MoveNext on this will remove the item from the BlockingCollection<T> effectively making it a queue. 
    //  Calling MoveNext on an empty queue will block until cancelled or an item is added.
    var consumingEnumerable = holderQueue.GetConsumingEnumerable(cts.Token);

    //Now we can make this Observable, as the underlying IEnumerbale<T> is a blocking consumer.
    //  Run on the QueueReader/newThreadScheduler thread.
    //  Use CancelationToken instead of IDisposable for single method of cancellation.
    consumingEnumerable.ToObservable(newThreadScheduler)
        .Timestamp()
        .Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler)
        .Subscribe(buffer =>
            {
                Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", buffer.Count);
                foreach(var item in buffer)
                    Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);

                Console.WriteLine("holderqueue has: {0}", holderQueue.Count);
            },
            cts.Token);


    Console.WriteLine("started observing queue");

    //Run until [Enter] is pressed by user.
    Console.ReadLine();

    //Cancel the production of values, the wait on the consuming enumerable and the subscription.
    cts.Cancel();
    Console.WriteLine("Cancelled");
}

private static void AddToQueue(BlockingCollection<int> input, CancellationToken cancellationToken)
{
    while(!cancellationToken.IsCancellationRequested)
    {
        var x = new Random().Next(1, 10);
        input.Add(x);
        Console.WriteLine("added '{0}'. Count={1}", x, input.Count);
        Thread.Sleep(1000);
    }
}

private static IEnumerable<int> GetInitialElements()
{
    var random = new Random();
    var items = new List<int>();
    for (int i = 0; i < 10; i++)
        items.Add(random.Next(1, 10));

    return items;
}

现在我想你会得到你期待的结果:

added '9'. Count=11
started observing queue
added '4'. Count=1
added '8'. Count=1
added '3'. Count=1
buffer time elapsed, current queue contents is: 14 items.
item 9 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 7 at 25/01/2015 22:25:35 +00:00
item 6 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 3 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 4 at 25/01/2015 22:25:36 +00:00
item 8 at 25/01/2015 22:25:37 +00:00
item 3 at 25/01/2015 22:25:38 +00:00
holderqueue has: 0
added '7'. Count=1
added '2'. Count=1
added '5'. Count=1
buffer time elapsed, current queue contents is: 3 items.
item 7 at 25/01/2015 22:25:39 +00:00
item 2 at 25/01/2015 22:25:40 +00:00
item 5 at 25/01/2015 22:25:41 +00:00
holderqueue has: 0
Cancelled