假脱机由 Observable.FromEvent 生成的正在进行的项目
Spooling ongoing items generated by Observable.FromEvent
我的目标是将所有 items/notifications 从 IObservable<T>
发送给未来的订阅者。
例如如果有人订阅消息流,他首先会收到订阅之前的所有消息。然后他开始接收新消息,只要有消息。这应该无缝地发生,在新旧消息之间的 "boundary" 上没有重复和丢失。
我想到了以下扩展方法:
public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
var accumulator = new BlockingCollection<T>();
source.Subscribe(accumulator.Add);
return accumulator
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(ThreadPoolScheduler.Instance);
}
据我测试,它有效:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
...
private static void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.WithHistory();
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
结果:
Ongoing: got 3
Ongoing: got 4
Ongoing: got 5
WithHistory: got 1
WithHistory: got 2
WithHistory: got 3
WithHistory: got 4
WithHistory: got 5
但是,使用 BlockingCollection<T>
似乎有些矫枉过正。此外,上述方法不支持完成、错误处理,如果没有 .SubscribeOn(ThreadPoolScheduler.Instance)
.
会导致死锁
有没有更好的方法来实现它,并且没有描述的缺陷?
最好的方法是 .Replay()
void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.Replay().RefCount();
using(var tempSubscriber = allItems.Subscribe())
{
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
}
.Replay().RefCount()
生成一个可观察对象,只要有订阅者,该对象就会保留一个内部队列进行重播。如果你有一个持久订阅者(就像你的解决方案在 WithHistory
方法中所做的那样),你就会发生内存泄漏。解决这个问题的最好方法是拥有一个临时订阅者,它会在您对历史不再感兴趣后自动断开连接。
我的目标是将所有 items/notifications 从 IObservable<T>
发送给未来的订阅者。
例如如果有人订阅消息流,他首先会收到订阅之前的所有消息。然后他开始接收新消息,只要有消息。这应该无缝地发生,在新旧消息之间的 "boundary" 上没有重复和丢失。
我想到了以下扩展方法:
public static IObservable<T> WithHistory<T>(this IObservable<T> source)
{
var accumulator = new BlockingCollection<T>();
source.Subscribe(accumulator.Add);
return accumulator
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(ThreadPoolScheduler.Instance);
}
据我测试,它有效:
class Generator<T>
{
event Action<T> onPush;
public IObservable<T> Items =>
Observable.FromEvent<T>(d => onPush += d, d => onPush -= d);
public void Push(T item) => onPush?.Invoke(item);
}
...
private static void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.WithHistory();
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
结果:
Ongoing: got 3 Ongoing: got 4 Ongoing: got 5 WithHistory: got 1 WithHistory: got 2 WithHistory: got 3 WithHistory: got 4 WithHistory: got 5
但是,使用 BlockingCollection<T>
似乎有些矫枉过正。此外,上述方法不支持完成、错误处理,如果没有 .SubscribeOn(ThreadPoolScheduler.Instance)
.
有没有更好的方法来实现它,并且没有描述的缺陷?
最好的方法是 .Replay()
void Main()
{
var g = new Generator<int>();
var ongoingItems = g.Items;
var allItems = g.Items.Replay().RefCount();
using(var tempSubscriber = allItems.Subscribe())
{
g.Push(1);
g.Push(2);
ongoingItems.Subscribe(x => Console.WriteLine($"Ongoing: got {x}"));
allItems.Subscribe(x => Console.WriteLine($"WithHistory: got {x}"));
g.Push(3);
g.Push(4);
g.Push(5);
Console.ReadLine();
}
}
.Replay().RefCount()
生成一个可观察对象,只要有订阅者,该对象就会保留一个内部队列进行重播。如果你有一个持久订阅者(就像你的解决方案在 WithHistory
方法中所做的那样),你就会发生内存泄漏。解决这个问题的最好方法是拥有一个临时订阅者,它会在您对历史不再感兴趣后自动断开连接。