Rx.NET 中是否有一个 Subject 实现在功能上类似于 BehaviorSubject 但仅在值已更改时才发出?

Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits only if the value has changed?

是否存在 Subject implementation in Rx.NET that functionally resembles BehaviorSubject 但仅在更改后才发出下一个值?

我对 Reactive Extensions 比较陌生,我似乎找不到类似的东西,尽管这种模式感觉像是 INotifyPropertyChanged.

的自然替代品

我天真的实现是像下面这样封装 BehaviorSubject<T>。与使用 Observable.DistinctUntilChanged 创建可组合的可观察对象相比,这有什么缺点吗?

    public class DistinctSubject<T> : SubjectBase<T>
    {
        private readonly BehaviorSubject<T> _subject;

        public DistinctSubject(T initialValue) =>
            _subject = new BehaviorSubject<T>(initialValue);

        public T Value 
        { 
            get => _subject.Value;
            set => this.OnNext(value);
        }

        public override bool HasObservers => _subject.HasObservers;

        public override bool IsDisposed => _subject.IsDisposed;

        public override void Dispose() => _subject.Dispose(); 

        public override void OnCompleted() => _subject.OnCompleted();   

        public override void OnError(Exception error) => _subject.OnError(error);

        public override void OnNext(T value)
        {
            if (!EqualityComparer<T>.Default.Equals(value, _subject.Value))
            {
                _subject.OnNext(value);
            }
        }

        public override IDisposable Subscribe(IObserver<T> observer) =>
            _subject.Subscribe(observer);
    }

稍微浏览了 BehaviorSubject<T> class 的 source code 之后,您的 DistinctSubject<T> 实现似乎会在 OnError 的情况下表现不同后跟 OnNext:

var subject = new DistinctSubject<int>(2021);
subject.OnError(new ApplicationException());
subject.OnNext(2022); // throws ApplicationException

这会抛出,而对 BehaviorSubject<T> 做同样的事情不会抛出(OnNext 被忽略)。

我的建议是在实现中使用DistinctUntilChanged运算符,像这样:

public class DistinctSubject<T> : ISubject<T>, IDisposable
{
    private readonly BehaviorSubject<T> _subject;
    private readonly IObservable<T> _distinctUntilChanged;

    public DistinctSubject(T initialValue, IEqualityComparer<T> comparer = default)
    {
        _subject = new BehaviorSubject<T>(initialValue);
        _distinctUntilChanged = _subject.DistinctUntilChanged(
            comparer ?? EqualityComparer<T>.Default);
    }

    public T Value => _subject.Value;
    public void OnNext(T value) => _subject.OnNext(value);
    public void OnError(Exception error) => _subject.OnError(error);
    public void OnCompleted() => _subject.OnCompleted();

    public IDisposable Subscribe(IObserver<T> observer) =>
        _distinctUntilChanged.Subscribe(observer);

    public void Dispose() => _subject.Dispose();
}

如果您担心对象的不必要分配,那么您还没有熟悉 Rx 的精神。这个库是关于功能和易用性的,而不是关于 或效率!