.Net 响应式扩展:删除重放
.Net Reactive extensions: Remove Replay
在下面的代码中,我使用 Replay()
来避免在执行 stuff [...]
时价格更新丢失。我需要在 stuff[...]
完成后发送这些价格更新。
var observable = Observable.FromEventPattern<Price>(h => book.PriceUpdated += h, h => book.PriceUpdated -= h)
.Replay();
observable.Connect();
// Do some stuff [...]
observable.Select(p => Observable.FromAsync(() => SendPriceUpdateAsync(p.Sender, p.EventArgs, socket)))
.Concat()
.Subscribe();
我想在 stuff [...]
完成并发送价格后删除 Replay()
(我不想存储所有值,我不再需要这些值一次我已经发送了。
有简单的方法吗?
所以我找不到任何方法来使用运算符,但我们总是可以创建我们自己的可连接可观察对象 class! :)
public class UntilSubscribeReplay<T> : IObservable<T>
{
private readonly IObservable<T> _parent;
private HashSet<IObserver<T>> _observers = new HashSe
private IDisposable _subscription;
private readonly Queue<T> _itemsQueue = new Queue<T>(
public UntilSubscribeReplay(IObservable<T> parent)
{
_parent = parent;
Subscribe();
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (_subscription == null)
Subscribe();
_observers.Add(observer);
EmitQueuedItems();
return Disposable.Create(() =>
{
_observers.Remove(observer);
if (_observers.Count == 0)
_subscription.Dispose();
});
}
private void Subscribe()
{
_subscription = _parent
.Do(_itemsQueue.Enqueue)
.Subscribe(_ => EmitQueuedItems());
}
private void EmitQueuedItems()
{
if (_observers.Count == 0)
return;
while (_itemsQueue.TryDequeue(out T item))
{
foreach (IObserver<T> observer in _observers)
{
observer.OnNext(item);
}
}
}
}
当然,为了方便起见,还有一种扩展方法。 :)
public static class MyObservableExtensions
{
public static IObservable<T> UntilSubscribeReplay<T>(this IObservable<T> source)
=> new UntilSubscribeReplay<T>(source);
}
下面是您将如何使用它:
IObservable<int> myReplay = myObservable.UntilSubscribeReplay();
// Do Stuff
myReplay.Subscribe(value => HandleValue(value));
请注意,不需要使用连接。正确的方法是将其实现为 Connectable Observable(为了避免在使用扩展但最终不订阅时发生内存泄漏),但对于您的特定情况,它应该足够了。
让我知道进展如何。 :)
所以你真正想要的是延迟所有事件,直到你完成做一些事情。我建议完全不要使用 Replay。
改为尝试这样的方法:
var observable = eventObservable
.TakeUntil(Observable.Start(() => { /* Do some stuff [...] */ })) //collect only events while processing, completes the observable when done processing
.ToList() //with ToList only propagate all at once on completion
.SelectMany(list => list.ToObservable()) //unfold the list to single events again
.Concat(eventObservable); //continue with normal events observable
在下面的代码中,我使用 Replay()
来避免在执行 stuff [...]
时价格更新丢失。我需要在 stuff[...]
完成后发送这些价格更新。
var observable = Observable.FromEventPattern<Price>(h => book.PriceUpdated += h, h => book.PriceUpdated -= h)
.Replay();
observable.Connect();
// Do some stuff [...]
observable.Select(p => Observable.FromAsync(() => SendPriceUpdateAsync(p.Sender, p.EventArgs, socket)))
.Concat()
.Subscribe();
我想在 stuff [...]
完成并发送价格后删除 Replay()
(我不想存储所有值,我不再需要这些值一次我已经发送了。
有简单的方法吗?
所以我找不到任何方法来使用运算符,但我们总是可以创建我们自己的可连接可观察对象 class! :)
public class UntilSubscribeReplay<T> : IObservable<T>
{
private readonly IObservable<T> _parent;
private HashSet<IObserver<T>> _observers = new HashSe
private IDisposable _subscription;
private readonly Queue<T> _itemsQueue = new Queue<T>(
public UntilSubscribeReplay(IObservable<T> parent)
{
_parent = parent;
Subscribe();
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (_subscription == null)
Subscribe();
_observers.Add(observer);
EmitQueuedItems();
return Disposable.Create(() =>
{
_observers.Remove(observer);
if (_observers.Count == 0)
_subscription.Dispose();
});
}
private void Subscribe()
{
_subscription = _parent
.Do(_itemsQueue.Enqueue)
.Subscribe(_ => EmitQueuedItems());
}
private void EmitQueuedItems()
{
if (_observers.Count == 0)
return;
while (_itemsQueue.TryDequeue(out T item))
{
foreach (IObserver<T> observer in _observers)
{
observer.OnNext(item);
}
}
}
}
当然,为了方便起见,还有一种扩展方法。 :)
public static class MyObservableExtensions
{
public static IObservable<T> UntilSubscribeReplay<T>(this IObservable<T> source)
=> new UntilSubscribeReplay<T>(source);
}
下面是您将如何使用它:
IObservable<int> myReplay = myObservable.UntilSubscribeReplay();
// Do Stuff
myReplay.Subscribe(value => HandleValue(value));
请注意,不需要使用连接。正确的方法是将其实现为 Connectable Observable(为了避免在使用扩展但最终不订阅时发生内存泄漏),但对于您的特定情况,它应该足够了。
让我知道进展如何。 :)
所以你真正想要的是延迟所有事件,直到你完成做一些事情。我建议完全不要使用 Replay。
改为尝试这样的方法:
var observable = eventObservable
.TakeUntil(Observable.Start(() => { /* Do some stuff [...] */ })) //collect only events while processing, completes the observable when done processing
.ToList() //with ToList only propagate all at once on completion
.SelectMany(list => list.ToObservable()) //unfold the list to single events again
.Concat(eventObservable); //continue with normal events observable