Observable 不对队列在不同线程上更改做出反应
Observable not reacting to queue changed on different thread
我有以下代码:
static void Main()
{
var holderQueue = new ConcurrentQueue<int>(GetInitialElements());
Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
var observableQueue = holderQueue.ToObservable();
IScheduler newThreadScheduler = new NewThreadScheduler();
IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();
var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);
var t = new TaskFactory();
t.StartNew(() => addToQueueAction(holderQueue));
using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
foreach(var item in currentQueue)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
}))
{
Console.WriteLine("started observing queue");
Console.ReadLine();
}
}
private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
{
while(true)
{
var x = new Random().Next(1, 10);
concurrentQueue.Enqueue(x);
Console.WriteLine("added {0}", x);
Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
意向如下:
holderQueue
对象最初由一些元素 (GetInitialElements
) 填充,然后在另一个线程上用更多元素进行更改(通过 AddToQueue
方法),可观察到的是应该检测到这种变化,并在它的时间过去时(所以每 3 秒)通过在其订阅中执行该方法做出相应的反应。
所以简而言之,我期望 Subscribe
主体中的代码每 3 秒执行一次,并向我显示队列中的更改(在不同的线程上更改)。相反 Subscribe
主体只执行一次。为什么?
谢谢
ToObservable
方法采用 IEnumerable<T>
并将其转换为可观察对象。结果,它将获取并发队列并立即枚举它,运行 遍历所有可用项目。您稍后修改队列以添加其他项目这一事实对从并发队列的 GetEnumerator()
实现返回的已枚举 IEnumerable<T>
没有影响。
根据 David Pfeffer 的回答,仅使用 .ToObserverable()
无法满足您的需求。
但是,当我查看您的代码时,我看到了几件事:
- 您正在使用
NewThreadScheduler
- 您正在通过任务添加到队列
- 您正在使用
ConcurrentQueue<T>
我认为只要改变一些地方就可以实现你在这里设定的目标。首先,我认为您实际上是在寻找 BlockingCollection<T>
。我知道这似乎不太可能,但您可以让它像线程安全队列一样工作。
接下来您已经将一个线程专用于对 NewThreadScheduler
执行某些操作,为什么不让队列中的 polling/pulling 执行此操作?
最后,如果你使用了BlockingCollection<T>.GetConsumingEnumerable(CancellationToken)
方法,其实你可以回去使用那个.ToObservable()
方法!
那么让我们看看重写后的代码:
static void Main()
{
//The processing thread. I try to set the the thread name as these tend to be long lived. This helps logs and debugging.
IScheduler newThreadScheduler = new NewThreadScheduler(ts=>{
var t = new Thread(ts);
t.Name = "QueueReader";
t.IsBackground = true;
return t;
});
//Provide the ability to cancel our work
var cts = new CancellationTokenSource();
//Use a BlockingCollection<T> instead of a ConcurrentQueue<T>
var holderQueue = new BlockingCollection<int>();
foreach (var element in GetInitialElements())
{
holderQueue.Add(element);
}
//The Action that periodically adds items to the queue. Now has cancellation support
Action<BlockingCollection<int>,CancellationToken> addToQueueAction = AddToQueue;
var tf = new TaskFactory();
tf.StartNew(() => addToQueueAction(holderQueue, cts.Token));
//Get a consuming enumerable. MoveNext on this will remove the item from the BlockingCollection<T> effectively making it a queue.
// Calling MoveNext on an empty queue will block until cancelled or an item is added.
var consumingEnumerable = holderQueue.GetConsumingEnumerable(cts.Token);
//Now we can make this Observable, as the underlying IEnumerbale<T> is a blocking consumer.
// Run on the QueueReader/newThreadScheduler thread.
// Use CancelationToken instead of IDisposable for single method of cancellation.
consumingEnumerable.ToObservable(newThreadScheduler)
.Timestamp()
.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler)
.Subscribe(buffer =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", buffer.Count);
foreach(var item in buffer)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", holderQueue.Count);
},
cts.Token);
Console.WriteLine("started observing queue");
//Run until [Enter] is pressed by user.
Console.ReadLine();
//Cancel the production of values, the wait on the consuming enumerable and the subscription.
cts.Cancel();
Console.WriteLine("Cancelled");
}
private static void AddToQueue(BlockingCollection<int> input, CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
var x = new Random().Next(1, 10);
input.Add(x);
Console.WriteLine("added '{0}'. Count={1}", x, input.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
现在我想你会得到你期待的结果:
added '9'. Count=11
started observing queue
added '4'. Count=1
added '8'. Count=1
added '3'. Count=1
buffer time elapsed, current queue contents is: 14 items.
item 9 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 7 at 25/01/2015 22:25:35 +00:00
item 6 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 3 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 4 at 25/01/2015 22:25:36 +00:00
item 8 at 25/01/2015 22:25:37 +00:00
item 3 at 25/01/2015 22:25:38 +00:00
holderqueue has: 0
added '7'. Count=1
added '2'. Count=1
added '5'. Count=1
buffer time elapsed, current queue contents is: 3 items.
item 7 at 25/01/2015 22:25:39 +00:00
item 2 at 25/01/2015 22:25:40 +00:00
item 5 at 25/01/2015 22:25:41 +00:00
holderqueue has: 0
Cancelled
我有以下代码:
static void Main()
{
var holderQueue = new ConcurrentQueue<int>(GetInitialElements());
Action<ConcurrentQueue<int>> addToQueueAction = AddToQueue;
var observableQueue = holderQueue.ToObservable();
IScheduler newThreadScheduler = new NewThreadScheduler();
IObservable<Timestamped<int>> myQueueTimestamped = observableQueue.Timestamp();
var bufferedTimestampedQueue = myQueueTimestamped.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler);
var t = new TaskFactory();
t.StartNew(() => addToQueueAction(holderQueue));
using(bufferedTimestampedQueue.SubscribeOn(newThreadScheduler).Subscribe(currentQueue =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", currentQueue.Count);
foreach(var item in currentQueue)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", currentQueue.Count);
}))
{
Console.WriteLine("started observing queue");
Console.ReadLine();
}
}
private static void AddToQueue(ConcurrentQueue<int> concurrentQueue)
{
while(true)
{
var x = new Random().Next(1, 10);
concurrentQueue.Enqueue(x);
Console.WriteLine("added {0}", x);
Console.WriteLine("crtcount is: {0}", concurrentQueue.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
意向如下:
holderQueue
对象最初由一些元素 (GetInitialElements
) 填充,然后在另一个线程上用更多元素进行更改(通过 AddToQueue
方法),可观察到的是应该检测到这种变化,并在它的时间过去时(所以每 3 秒)通过在其订阅中执行该方法做出相应的反应。
所以简而言之,我期望 Subscribe
主体中的代码每 3 秒执行一次,并向我显示队列中的更改(在不同的线程上更改)。相反 Subscribe
主体只执行一次。为什么?
谢谢
ToObservable
方法采用 IEnumerable<T>
并将其转换为可观察对象。结果,它将获取并发队列并立即枚举它,运行 遍历所有可用项目。您稍后修改队列以添加其他项目这一事实对从并发队列的 GetEnumerator()
实现返回的已枚举 IEnumerable<T>
没有影响。
根据 David Pfeffer 的回答,仅使用 .ToObserverable()
无法满足您的需求。
但是,当我查看您的代码时,我看到了几件事:
- 您正在使用
NewThreadScheduler
- 您正在通过任务添加到队列
- 您正在使用
ConcurrentQueue<T>
我认为只要改变一些地方就可以实现你在这里设定的目标。首先,我认为您实际上是在寻找 BlockingCollection<T>
。我知道这似乎不太可能,但您可以让它像线程安全队列一样工作。
接下来您已经将一个线程专用于对 NewThreadScheduler
执行某些操作,为什么不让队列中的 polling/pulling 执行此操作?
最后,如果你使用了BlockingCollection<T>.GetConsumingEnumerable(CancellationToken)
方法,其实你可以回去使用那个.ToObservable()
方法!
那么让我们看看重写后的代码:
static void Main()
{
//The processing thread. I try to set the the thread name as these tend to be long lived. This helps logs and debugging.
IScheduler newThreadScheduler = new NewThreadScheduler(ts=>{
var t = new Thread(ts);
t.Name = "QueueReader";
t.IsBackground = true;
return t;
});
//Provide the ability to cancel our work
var cts = new CancellationTokenSource();
//Use a BlockingCollection<T> instead of a ConcurrentQueue<T>
var holderQueue = new BlockingCollection<int>();
foreach (var element in GetInitialElements())
{
holderQueue.Add(element);
}
//The Action that periodically adds items to the queue. Now has cancellation support
Action<BlockingCollection<int>,CancellationToken> addToQueueAction = AddToQueue;
var tf = new TaskFactory();
tf.StartNew(() => addToQueueAction(holderQueue, cts.Token));
//Get a consuming enumerable. MoveNext on this will remove the item from the BlockingCollection<T> effectively making it a queue.
// Calling MoveNext on an empty queue will block until cancelled or an item is added.
var consumingEnumerable = holderQueue.GetConsumingEnumerable(cts.Token);
//Now we can make this Observable, as the underlying IEnumerbale<T> is a blocking consumer.
// Run on the QueueReader/newThreadScheduler thread.
// Use CancelationToken instead of IDisposable for single method of cancellation.
consumingEnumerable.ToObservable(newThreadScheduler)
.Timestamp()
.Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(3), newThreadScheduler)
.Subscribe(buffer =>
{
Console.WriteLine("buffer time elapsed, current queue contents is: {0} items.", buffer.Count);
foreach(var item in buffer)
Console.WriteLine("item {0} at {1}", item.Value, item.Timestamp);
Console.WriteLine("holderqueue has: {0}", holderQueue.Count);
},
cts.Token);
Console.WriteLine("started observing queue");
//Run until [Enter] is pressed by user.
Console.ReadLine();
//Cancel the production of values, the wait on the consuming enumerable and the subscription.
cts.Cancel();
Console.WriteLine("Cancelled");
}
private static void AddToQueue(BlockingCollection<int> input, CancellationToken cancellationToken)
{
while(!cancellationToken.IsCancellationRequested)
{
var x = new Random().Next(1, 10);
input.Add(x);
Console.WriteLine("added '{0}'. Count={1}", x, input.Count);
Thread.Sleep(1000);
}
}
private static IEnumerable<int> GetInitialElements()
{
var random = new Random();
var items = new List<int>();
for (int i = 0; i < 10; i++)
items.Add(random.Next(1, 10));
return items;
}
现在我想你会得到你期待的结果:
added '9'. Count=11
started observing queue
added '4'. Count=1
added '8'. Count=1
added '3'. Count=1
buffer time elapsed, current queue contents is: 14 items.
item 9 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 5 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 7 at 25/01/2015 22:25:35 +00:00
item 6 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 2 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 3 at 25/01/2015 22:25:35 +00:00
item 9 at 25/01/2015 22:25:35 +00:00
item 4 at 25/01/2015 22:25:36 +00:00
item 8 at 25/01/2015 22:25:37 +00:00
item 3 at 25/01/2015 22:25:38 +00:00
holderqueue has: 0
added '7'. Count=1
added '2'. Count=1
added '5'. Count=1
buffer time elapsed, current queue contents is: 3 items.
item 7 at 25/01/2015 22:25:39 +00:00
item 2 at 25/01/2015 22:25:40 +00:00
item 5 at 25/01/2015 22:25:41 +00:00
holderqueue has: 0
Cancelled