如何清除 ReplaySubject 上的缓冲区?
How can I clear the buffer on a ReplaySubject?
如何清除 ReplaySubject
上的缓冲区?
我需要定期清除缓冲区(在我的情况下作为一天结束的事件)以防止 ReplaySubject
不断增长并最终吃掉所有内存。
理想情况下,我希望保持不变 ReplaySubject
,因为客户订阅仍然很好。
ReplaySubject
不提供清除缓冲区的方法,但有几种重载以不同方式限制其缓冲区:
- 最多
TimeSpan
项保留
- 最大项目数
- 上述的组合,只要满足任一条件就会掉落物品。
一个可清除的 ReplaySubject
这是一个非常有趣的问题 - 我决定看看实现 ReplaySubject
你 can clear 的变体是多么容易 - 使用现有的主题和运算符(因为它们非常坚固)。事实证明它相当简单。
我已经 运行 通过内存分析器检查它是否正确。调用 Clear()
刷新缓冲区,否则它就像一个普通的无界 ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
遵守通常的规则(与任何 Subject
一样)并且不要同时调用此 class 上的方法 - 包括 Clear()
。如果需要,您可以简单地添加同步锁。
它通过在主 ReplaySubject 中嵌套一系列 ReplaySubjects 来工作。外层 ReplaySubject (_subjects
) 保存了正好一个内层 ReplaySubject (_currentSubject
) 的缓冲区,并在构造时填充。
OnXXX
方法调用 _currentSubject
ReplaySubject。
观察者订阅了嵌套 ReplaySubjects 的串联投影(保存在 _concatenatedSubjects
中)。因为 _subjects
的缓冲区大小只有 1,所以新订阅者只会获取最近的 ReplaySubject
之后的事件。
每当我们需要“清除缓冲区”时,现有的 _currentSubject
是 OnCompleted
并且新的 ReplaySubject 添加到 _subjects
并成为新的 _currentSubject
。
增强功能
根据@Brandon 的建议,我创建了一个 RollingReplaySubject
版本,它使用 TimeSpan
或输入流来指示缓冲区清除。我在这里为此创建了一个要点:https://gist.github.com/james-world/c46f09f32e2d4f338b07
很可能你已经有一个Observable数据源,在这种情况下,这里有另一种解决方案。这个使用现有 RX 构造的组合,而不是构建你自己的 ISubject,我个人对此持谨慎态度。
public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
private readonly IConnectableObservable<IObservable<TSource>> _underlying;
private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();
public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
{
_underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
.Select(_ =>
{
var underlyingReplay = src.Replay();
_replayConnectDisposable.Disposable = underlyingReplay.Connect();
return underlyingReplay;
})
.Replay(1);
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _underlying.Switch().Subscribe(observer);
}
public IDisposable Connect()
{
return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
}
}
如果您将以下扩展方法添加到您的 ObservableEx:
public static class ObservableEx
{
public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
{
return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
}
}
然后您可以使用您的源并添加 .ReplayWithReset(...) 和您的重置触发器 Observable。这可以是计时器或其他任何东西。
var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();
连接的行为方式与重播相同。
好吧,我不知道 c#,但我设法在重播主题 rxdart 中完成了它。
至于 replaysubject 它使用队列来缓存事件所以我
修改重播主题 class.
- 我改变了所有队列
列出
- 添加了 onRemove 方法,该方法将从
chached 列表。
原始重播主题:
class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;
/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = Queue<T>();
return ReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._queue,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(event);
}
@override
List<T> get values => _queue.toList(growable: false);
}
修改回放主题:
class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;
/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = List<T>();
return ModifiedReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ModifiedReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._list,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_list.length == _maxSize) {
_list.removeAt(0);
}
_list.add(event);
}
void onRemove(T event) {
_list.remove(event);
}
@override
List<T> get values => _list.toList(growable: false);
}
如何清除 ReplaySubject
上的缓冲区?
我需要定期清除缓冲区(在我的情况下作为一天结束的事件)以防止 ReplaySubject
不断增长并最终吃掉所有内存。
理想情况下,我希望保持不变 ReplaySubject
,因为客户订阅仍然很好。
ReplaySubject
不提供清除缓冲区的方法,但有几种重载以不同方式限制其缓冲区:
- 最多
TimeSpan
项保留 - 最大项目数
- 上述的组合,只要满足任一条件就会掉落物品。
一个可清除的 ReplaySubject
这是一个非常有趣的问题 - 我决定看看实现 ReplaySubject
你 can clear 的变体是多么容易 - 使用现有的主题和运算符(因为它们非常坚固)。事实证明它相当简单。
我已经 运行 通过内存分析器检查它是否正确。调用 Clear()
刷新缓冲区,否则它就像一个普通的无界 ReplaySubject
:
public class RollingReplaySubject<T> : ISubject<T>
{
private readonly ReplaySubject<IObservable<T>> _subjects;
private readonly IObservable<T> _concatenatedSubjects;
private ISubject<T> _currentSubject;
public RollingReplaySubject()
{
_subjects = new ReplaySubject<IObservable<T>>(1);
_concatenatedSubjects = _subjects.Concat();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void Clear()
{
_currentSubject.OnCompleted();
_currentSubject = new ReplaySubject<T>();
_subjects.OnNext(_currentSubject);
}
public void OnNext(T value)
{
_currentSubject.OnNext(value);
}
public void OnError(Exception error)
{
_currentSubject.OnError(error);
}
public void OnCompleted()
{
_currentSubject.OnCompleted();
_subjects.OnCompleted();
// a quick way to make the current ReplaySubject unreachable
// except to in-flight observers, and not hold up collection
_currentSubject = new Subject<T>();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _concatenatedSubjects.Subscribe(observer);
}
}
遵守通常的规则(与任何 Subject
一样)并且不要同时调用此 class 上的方法 - 包括 Clear()
。如果需要,您可以简单地添加同步锁。
它通过在主 ReplaySubject 中嵌套一系列 ReplaySubjects 来工作。外层 ReplaySubject (_subjects
) 保存了正好一个内层 ReplaySubject (_currentSubject
) 的缓冲区,并在构造时填充。
OnXXX
方法调用 _currentSubject
ReplaySubject。
观察者订阅了嵌套 ReplaySubjects 的串联投影(保存在 _concatenatedSubjects
中)。因为 _subjects
的缓冲区大小只有 1,所以新订阅者只会获取最近的 ReplaySubject
之后的事件。
每当我们需要“清除缓冲区”时,现有的 _currentSubject
是 OnCompleted
并且新的 ReplaySubject 添加到 _subjects
并成为新的 _currentSubject
。
增强功能
根据@Brandon 的建议,我创建了一个 RollingReplaySubject
版本,它使用 TimeSpan
或输入流来指示缓冲区清除。我在这里为此创建了一个要点:https://gist.github.com/james-world/c46f09f32e2d4f338b07
很可能你已经有一个Observable数据源,在这种情况下,这里有另一种解决方案。这个使用现有 RX 构造的组合,而不是构建你自己的 ISubject,我个人对此持谨慎态度。
public class ClearableReplaySubject<TSource, TClearTrigger> : IConnectableObservable<TSource>
{
private readonly IConnectableObservable<IObservable<TSource>> _underlying;
private readonly SerialDisposable _replayConnectDisposable = new SerialDisposable();
public ClearableReplaySubject(IObservable<TSource> src, IObservable<TClearTrigger> clearTrigger)
{
_underlying = clearTrigger.Select(_ => Unit.Default).StartWith(Unit.Default)
.Select(_ =>
{
var underlyingReplay = src.Replay();
_replayConnectDisposable.Disposable = underlyingReplay.Connect();
return underlyingReplay;
})
.Replay(1);
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
return _underlying.Switch().Subscribe(observer);
}
public IDisposable Connect()
{
return new CompositeDisposable(_underlying.Connect(), _replayConnectDisposable.Disposable);
}
}
如果您将以下扩展方法添加到您的 ObservableEx:
public static class ObservableEx
{
public static IConnectableObservable<TItem> ReplayWithReset<TItem, TReset>(this IObservable<TItem> src, IObservable<TReset> resetTrigger)
{
return new ClearableReplaySubject<TItem, TReset>(src, resetTrigger);
}
}
然后您可以使用您的源并添加 .ReplayWithReset(...) 和您的重置触发器 Observable。这可以是计时器或其他任何东西。
var replay = sourceObservable.ReplayWithReset(triggerObservable);
var connection = replay.Connect();
连接的行为方式与重播相同。
好吧,我不知道 c#,但我设法在重播主题 rxdart 中完成了它。 至于 replaysubject 它使用队列来缓存事件所以我 修改重播主题 class.
- 我改变了所有队列 列出
- 添加了 onRemove 方法,该方法将从 chached 列表。
原始重播主题:
class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final Queue<T> _queue;
final int _maxSize;
/// Constructs a [ReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = Queue<T>();
return ReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._queue,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_queue.length == _maxSize) {
_queue.removeFirst();
}
_queue.add(event);
}
@override
List<T> get values => _queue.toList(growable: false);
}
修改回放主题:
class ModifiedReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
final List<T> _list;
final int _maxSize;
/// Constructs a [ModifiedReplaySubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
///
/// See also [StreamController.broadcast]
factory ModifiedReplaySubject({
int maxSize,
void Function() onListen,
void Function() onCancel,
bool sync = false,
}) {
// ignore: close_sinks
final controller = StreamController<T>.broadcast(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
final queue = List<T>();
return ModifiedReplaySubject<T>._(
controller,
Rx.defer<T>(
() => controller.stream.startWithMany(queue.toList(growable: false)),
reusable: true,
),
queue,
maxSize,
);
}
ModifiedReplaySubject._(
StreamController<T> controller,
Stream<T> stream,
this._list,
this._maxSize,
) : super(controller, stream);
@override
void onAdd(T event) {
if (_list.length == _maxSize) {
_list.removeAt(0);
}
_list.add(event);
}
void onRemove(T event) {
_list.remove(event);
}
@override
List<T> get values => _list.toList(growable: false);
}