.Net 响应式扩展:删除重放

.Net Reactive extensions: Remove Replay

在下面的代码中,我使用 Replay() 来避免在执行 stuff [...] 时价格更新丢失。我需要在 stuff[...] 完成后发送这些价格更新。

var observable = Observable.FromEventPattern<Price>(h => book.PriceUpdated += h, h => book.PriceUpdated -= h)
                           .Replay();
observable.Connect();

// Do some stuff [...]

observable.Select(p => Observable.FromAsync(() => SendPriceUpdateAsync(p.Sender, p.EventArgs, socket)))
                      .Concat()
                      .Subscribe();

我想在 stuff [...] 完成并发送价格后删除 Replay()(我不想存储所有值,我不再需要这些值一次我已经发送了。

有简单的方法吗?

所以我找不到任何方法来使用运算符,但我们总是可以创建我们自己的可连接可观察对象 class! :)

public class UntilSubscribeReplay<T> : IObservable<T>
{
    private readonly IObservable<T> _parent;

    private HashSet<IObserver<T>> _observers = new HashSe
    private IDisposable _subscription;
    private readonly Queue<T> _itemsQueue = new Queue<T>(

    public UntilSubscribeReplay(IObservable<T> parent)
    {
        _parent = parent;
        Subscribe();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (_subscription == null)
            Subscribe();

        _observers.Add(observer);
        EmitQueuedItems();

        return Disposable.Create(() =>
        {
            _observers.Remove(observer);
            if (_observers.Count == 0)
                _subscription.Dispose();
        });
    }

    private void Subscribe()
    {
        _subscription = _parent
            .Do(_itemsQueue.Enqueue)
            .Subscribe(_ => EmitQueuedItems());
    }

    private void EmitQueuedItems()
    {
        if (_observers.Count == 0)
            return;

        while (_itemsQueue.TryDequeue(out T item))
        {
            foreach (IObserver<T> observer in _observers)
            {
                observer.OnNext(item);
            }
        }
    }
}

当然,为了方便起见,还有一种扩展方法。 :)

public static class MyObservableExtensions
{
    public static IObservable<T> UntilSubscribeReplay<T>(this IObservable<T> source) 
        => new UntilSubscribeReplay<T>(source);
}

下面是您将如何使用它:

IObservable<int> myReplay = myObservable.UntilSubscribeReplay();

// Do Stuff

myReplay.Subscribe(value => HandleValue(value));

请注意,不需要使用连接。正确的方法是将其实现为 Connectable Observable(为了避免在使用扩展但最终不订阅时发生内存泄漏),但对于您的特定情况,它应该足够了。

让我知道进展如何。 :)

所以你真正想要的是延迟所有事件,直到你完成做一些事情。我建议完全不要使用 Replay。

改为尝试这样的方法:

var observable = eventObservable
.TakeUntil(Observable.Start(() => { /* Do some stuff [...] */ })) //collect only events while processing, completes the observable when done processing
.ToList() //with ToList only propagate all at once on completion
.SelectMany(list => list.ToObservable()) //unfold the list to single events again
.Concat(eventObservable); //continue with normal events observable