Roland Pheasant 使用 DynamicData 实现尾部方法
Tail method implementation with DynamicData by Roland Pheasant
我试图弄清楚如何使用 DynamicData 库。我需要制作处理源更改并仅传递最后 n 个更改的方法。它可以命名为 Tail()。包中有一个名为 Top() 的方法,它使用 IVirtualRequest 的。我写了基于这个功能的例子:
public static class Extensions
{
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source,
int numberOfItems)
{
var request = new TailRequest<T>(source, numberOfItems);
return source.Virtualise(Observable.Return(request));
}
}
public class TailRequest<T> : IVirtualRequest, IDisposable
{
private readonly IDisposable subscription;
private int _count;
public int Size { get; }
public int StartIndex => _count > Size ? _count - Size : 0;
public TailRequest(IObservable<IChangeSet<T>> source, int numberOfItems)
{
//how to dispose this from outside???
subscription = source.Subscribe(RefreshStartIndex);
Size = numberOfItems;
}
private void RefreshStartIndex(IChangeSet<T> changeSet)
{
_count += changeSet.Adds;
_count -= changeSet.Removes;
}
public void Dispose()
{
subscription.Dispose();
}
}
但是我不明白我应该如何处理来自外部的这个请求,如果我使用这样的方法:
SourceList<Message> sourceList = new SourceList<Message>();
var subscription = _sourceList.Connect()
.Tail(15)
.ObserveOn(RxApp.MainThreadScheduler)
.Bind(Messages)
.Subscribe();
我认为 subscription.Dispose() 不会处理我的 TailRequest。
或者有更好的解决方案吗?
这样就可以了
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source, int numberOfItems)
{
return Observable.Create<IChangeSet<T>>(observer =>
{
var request = new TailRequest<T>(source, numberOfItems);
return new CompositeDisposable
(
request,
source.Virtualise(Observable.Return(request)).SubscribeSafe(observer)
);
});
Observable.Create returns 中的函数是一个一次性的,用于清理由 observable 创建的任何资源。当消费订阅者被销毁时,函数中创建的内部一次性也被销毁。
我试图弄清楚如何使用 DynamicData 库。我需要制作处理源更改并仅传递最后 n 个更改的方法。它可以命名为 Tail()。包中有一个名为 Top() 的方法,它使用 IVirtualRequest 的。我写了基于这个功能的例子:
public static class Extensions
{
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source,
int numberOfItems)
{
var request = new TailRequest<T>(source, numberOfItems);
return source.Virtualise(Observable.Return(request));
}
}
public class TailRequest<T> : IVirtualRequest, IDisposable
{
private readonly IDisposable subscription;
private int _count;
public int Size { get; }
public int StartIndex => _count > Size ? _count - Size : 0;
public TailRequest(IObservable<IChangeSet<T>> source, int numberOfItems)
{
//how to dispose this from outside???
subscription = source.Subscribe(RefreshStartIndex);
Size = numberOfItems;
}
private void RefreshStartIndex(IChangeSet<T> changeSet)
{
_count += changeSet.Adds;
_count -= changeSet.Removes;
}
public void Dispose()
{
subscription.Dispose();
}
}
但是我不明白我应该如何处理来自外部的这个请求,如果我使用这样的方法:
SourceList<Message> sourceList = new SourceList<Message>();
var subscription = _sourceList.Connect()
.Tail(15)
.ObserveOn(RxApp.MainThreadScheduler)
.Bind(Messages)
.Subscribe();
我认为 subscription.Dispose() 不会处理我的 TailRequest。 或者有更好的解决方案吗?
这样就可以了
public static IObservable<IChangeSet<T>> Tail<T>([NotNull] this IObservable<IChangeSet<T>> source, int numberOfItems)
{
return Observable.Create<IChangeSet<T>>(observer =>
{
var request = new TailRequest<T>(source, numberOfItems);
return new CompositeDisposable
(
request,
source.Virtualise(Observable.Return(request)).SubscribeSafe(observer)
);
});
Observable.Create returns 中的函数是一个一次性的,用于清理由 observable 创建的任何资源。当消费订阅者被销毁时,函数中创建的内部一次性也被销毁。