一个 Rx observable 将充当 ReplaySubject 但仅适用于第一个订阅者?
An Rx observable that would act as ReplaySubject but only for the 1st subscriber?
什么是构成类似于 ReplaySubject
, but only emit the accumulated sequence once and for the first subscriber only (when that subscriber is connected)? After the 1st subscription, it should act just as a regular Subject
.
的 Rx observable 的优雅方式
这是一个 .NET 项目,但我同样感谢 JavaScript/RxJS 个回答。
我为潜在的解决方案做了 google,我即将推出自己的解决方案,类似于 how I approached DistinctSubject
,最终。
我稍微修改了 中的实现,并将 class 的名称从 ReplayOnceSubject
更改为 ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
else
return _subject.Subscribe(observer);
}
}
}
这可能不是最有效的解决方案,因为每个操作都会获取两个不同的 lock
(_locker
和 internal _gate
),但是它
应该也不会很差吧。
什么是构成类似于 ReplaySubject
, but only emit the accumulated sequence once and for the first subscriber only (when that subscriber is connected)? After the 1st subscription, it should act just as a regular Subject
.
这是一个 .NET 项目,但我同样感谢 JavaScript/RxJS 个回答。
我为潜在的解决方案做了 google,我即将推出自己的解决方案,类似于 how I approached DistinctSubject
,最终。
我稍微修改了 ReplayOnceSubject
更改为 ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
else
return _subject.Subscribe(observer);
}
}
}
这可能不是最有效的解决方案,因为每个操作都会获取两个不同的 lock
(_locker
和 internal _gate
),但是它
应该也不会很差吧。