限制调度程序中的通知数量
Limit number of notifications in scheduler
我有一个冷观察者和一个观察者。
两者都很慢,但观察者比可观察者慢。
他们处理很多很多通知,所以我不想无限制地存储通知。
此示例大约需要 30 秒才能完成。非常慢。我相信他们可以在 21 秒内完成。
var subject = new Subject<int>();
subject.Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
此示例大约在 20 秒内完成,但可观察对象在观察者显示“4”之前结束。
它表示调度程序在某处存储了 4 到 9。恐怕如果它存储超过 1'000'000 个通知并抛出 OutOfMemoryException。
var subject = new Subject<int>();
subject.ObserveOn(ThreadPoolScheduler.Instance).Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
这就是为什么我想限制调度程序中的通知数量。
编辑:图表
x : calculate or other task
S : send notification
R : receive notification
-- time -->
sample1
thread1: xxxS xxxS xxxS
thread2: Rxxxxxx Rxxxxxx Rxxxxxx
sample2
thread1: xxxSxxxSxxxSxxxSxxxSxxxSxxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
I want
thread1: xxxSxxxS xxxS xxxS xxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
Rx 合约要求通知序列化,因此即使您可以指定 Scheduler
,它更类似于说 "here, use this scheduler for managing concurrency"。
ThreadPoolScheduler
仍将序列化通知,因此最终结果是它不会并行调用您的方法。
如果要异步执行,可以改写成这样:
subject.Subscribe(async i =>
{
await Task.Delay(1000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
但潜在的问题是你的消费者落后于你的生产者。
您可以考虑使用 backpressure,或者如果您的应用程序是一系列数据处理任务,您还可以查看优秀的 TPL Dataflow。
在事件源和最终接收器之间的管道中发生了什么,这就是 Rx 最擅长的地方。
您可以尝试使用 SemaphoreSlim
.
实施自定义 IScheduler
来限制请求(方法 Schedule
)
或者,您可以创建一个 BlockingThrottle
扩展方法来接受和 returns 一个 IObservable
,您可以在订阅它之前链接到原始 IObservable
。这是一个使用 BlockingCollection
作为节流机制的实现:
private static IObservable<T> BlockingThrottle<T>(this IObservable<T> source,
int boundedCapacity)
{
return Observable.Create<T>(observer =>
{
var queue = new BlockingCollection<T>(boundedCapacity);
var cts = new CancellationTokenSource();
var locker = new object();
Exception exception = null;
new Thread(() =>
{
try
{
foreach (var item in queue.GetConsumingEnumerable(cts.Token))
{
observer.OnNext(item);
}
observer.OnCompleted();
}
catch (OperationCanceledException)
{
Exception ex; lock (locker) ex = exception;
if (ex != null) observer.OnError(ex);
}
// Leave all other exceptions unhandled.
// The responsibility for catching them belongs to the caller.
})
{ IsBackground = true }.Start();
var subscription = source.Subscribe(x =>
{
try
{
queue.Add(x, cts.Token);
}
catch (OperationCanceledException) { } // Ignore this exception too
}, ex =>
{
lock (locker) exception = ex;
cts.Cancel();
}, () =>
{
queue.CompleteAdding();
});
return Disposable.Create(() =>
{
cts.Cancel();
subscription.Dispose();
});
});
}
用法示例:
subject.BlockingThrottle(boundedCapacity: 10).Subscribe(i =>
注意: 如果您打算在 ASP.NET 应用程序中使用它,请考虑将 BlockingCollection
替换为异步队列(如 BufferBlock<T>
或 Channel<T>
),以避免阻塞线程。
我有一个冷观察者和一个观察者。 两者都很慢,但观察者比可观察者慢。 他们处理很多很多通知,所以我不想无限制地存储通知。
此示例大约需要 30 秒才能完成。非常慢。我相信他们可以在 21 秒内完成。
var subject = new Subject<int>();
subject.Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
此示例大约在 20 秒内完成,但可观察对象在观察者显示“4”之前结束。 它表示调度程序在某处存储了 4 到 9。恐怕如果它存储超过 1'000'000 个通知并抛出 OutOfMemoryException。
var subject = new Subject<int>();
subject.ObserveOn(ThreadPoolScheduler.Instance).Subscribe(i =>
{
Thread.Sleep(2000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
Task.Run(() =>
{
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - Start");
foreach (var i in Enumerable.Range(0, 10))
{
Thread.Sleep(1000);
subject.OnNext(i);
}
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - End");
});
这就是为什么我想限制调度程序中的通知数量。
编辑:图表
x : calculate or other task
S : send notification
R : receive notification
-- time -->
sample1
thread1: xxxS xxxS xxxS
thread2: Rxxxxxx Rxxxxxx Rxxxxxx
sample2
thread1: xxxSxxxSxxxSxxxSxxxSxxxSxxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
I want
thread1: xxxSxxxS xxxS xxxS xxxS
thread2: RxxxxxxRxxxxxxRxxxxxxRxxxxxx
Rx 合约要求通知序列化,因此即使您可以指定 Scheduler
,它更类似于说 "here, use this scheduler for managing concurrency"。
ThreadPoolScheduler
仍将序列化通知,因此最终结果是它不会并行调用您的方法。
如果要异步执行,可以改写成这样:
subject.Subscribe(async i =>
{
await Task.Delay(1000);
Console.WriteLine($"{DateTime.Now:HH:mm:ss} - {i}");
});
但潜在的问题是你的消费者落后于你的生产者。 您可以考虑使用 backpressure,或者如果您的应用程序是一系列数据处理任务,您还可以查看优秀的 TPL Dataflow。
在事件源和最终接收器之间的管道中发生了什么,这就是 Rx 最擅长的地方。
您可以尝试使用 SemaphoreSlim
.
IScheduler
来限制请求(方法 Schedule
)
或者,您可以创建一个 BlockingThrottle
扩展方法来接受和 returns 一个 IObservable
,您可以在订阅它之前链接到原始 IObservable
。这是一个使用 BlockingCollection
作为节流机制的实现:
private static IObservable<T> BlockingThrottle<T>(this IObservable<T> source,
int boundedCapacity)
{
return Observable.Create<T>(observer =>
{
var queue = new BlockingCollection<T>(boundedCapacity);
var cts = new CancellationTokenSource();
var locker = new object();
Exception exception = null;
new Thread(() =>
{
try
{
foreach (var item in queue.GetConsumingEnumerable(cts.Token))
{
observer.OnNext(item);
}
observer.OnCompleted();
}
catch (OperationCanceledException)
{
Exception ex; lock (locker) ex = exception;
if (ex != null) observer.OnError(ex);
}
// Leave all other exceptions unhandled.
// The responsibility for catching them belongs to the caller.
})
{ IsBackground = true }.Start();
var subscription = source.Subscribe(x =>
{
try
{
queue.Add(x, cts.Token);
}
catch (OperationCanceledException) { } // Ignore this exception too
}, ex =>
{
lock (locker) exception = ex;
cts.Cancel();
}, () =>
{
queue.CompleteAdding();
});
return Disposable.Create(() =>
{
cts.Cancel();
subscription.Dispose();
});
});
}
用法示例:
subject.BlockingThrottle(boundedCapacity: 10).Subscribe(i =>
注意: 如果您打算在 ASP.NET 应用程序中使用它,请考虑将 BlockingCollection
替换为异步队列(如 BufferBlock<T>
或 Channel<T>
),以避免阻塞线程。