RX.net 的 DropQueue 机制
DropQueue mechanism for RX.net
我遇到了 RX.net 的背压问题,我找不到解决方案。我有一个可观察到的实时日志消息流。
var logObservable = /* Observable stream of log messages */
我想通过 TCP 接口公开,该接口在通过网络发送之前序列化来自 logObservable
的实时日志消息。所以我做了以下事情:
foreach (var message in logObservable.ToEnumerable())
{
// 1. Serialize message
// 2. Send it over the wire.
}
如果发生背压情况,例如 .ToEnumerable()
,问题就会出现。如果另一端的客户端暂停流。问题是 .ToEnumerable()
缓存了导致大量内存使用的项目。我正在寻找一种类似于 DropQueue
的机制,它只缓冲,比方说,最后 10 条消息,例如
var observableStream = logObservable.DropQueue(10).ToEnumerable();
这是解决这个问题的正确方法吗?你知道实施这样的机制来避免可能的背压问题吗?
我的 DropQueue
实现:
public static IEnumerable<TSource> ToDropQueue<TSource>(
this IObservable<TSource> source,
int queueSize,
Action backPressureNotification = null,
CancellationToken token = default(CancellationToken))
{
var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize);
var isBackPressureNotified = false;
var subscription = source.Subscribe(
item =>
{
var isBackPressure = queue.Count == queue.BoundedCapacity;
if (isBackPressure)
{
queue.Take(); // Dequeue an item to make space for the next one
// Fire back-pressure notification if defined
if (!isBackPressureNotified && backPressureNotification != null)
{
backPressureNotification();
isBackPressureNotified = true;
}
}
else
{
isBackPressureNotified = false;
}
queue.Add(item);
},
exception => queue.CompleteAdding(),
() => queue.CompleteAdding());
token.Register(() => { subscription.Dispose(); });
using (new CompositeDisposable(subscription, queue))
{
foreach (var item in queue.GetConsumingEnumerable())
{
yield return item;
}
}
}
我遇到了 RX.net 的背压问题,我找不到解决方案。我有一个可观察到的实时日志消息流。
var logObservable = /* Observable stream of log messages */
我想通过 TCP 接口公开,该接口在通过网络发送之前序列化来自 logObservable
的实时日志消息。所以我做了以下事情:
foreach (var message in logObservable.ToEnumerable())
{
// 1. Serialize message
// 2. Send it over the wire.
}
如果发生背压情况,例如 .ToEnumerable()
,问题就会出现。如果另一端的客户端暂停流。问题是 .ToEnumerable()
缓存了导致大量内存使用的项目。我正在寻找一种类似于 DropQueue
的机制,它只缓冲,比方说,最后 10 条消息,例如
var observableStream = logObservable.DropQueue(10).ToEnumerable();
这是解决这个问题的正确方法吗?你知道实施这样的机制来避免可能的背压问题吗?
我的 DropQueue
实现:
public static IEnumerable<TSource> ToDropQueue<TSource>(
this IObservable<TSource> source,
int queueSize,
Action backPressureNotification = null,
CancellationToken token = default(CancellationToken))
{
var queue = new BlockingCollection<TSource>(new ConcurrentQueue<TSource>(), queueSize);
var isBackPressureNotified = false;
var subscription = source.Subscribe(
item =>
{
var isBackPressure = queue.Count == queue.BoundedCapacity;
if (isBackPressure)
{
queue.Take(); // Dequeue an item to make space for the next one
// Fire back-pressure notification if defined
if (!isBackPressureNotified && backPressureNotification != null)
{
backPressureNotification();
isBackPressureNotified = true;
}
}
else
{
isBackPressureNotified = false;
}
queue.Add(item);
},
exception => queue.CompleteAdding(),
() => queue.CompleteAdding());
token.Register(() => { subscription.Dispose(); });
using (new CompositeDisposable(subscription, queue))
{
foreach (var item in queue.GetConsumingEnumerable())
{
yield return item;
}
}
}