如何将 "reload" 和 IsLoading 状态添加到二级 Observable
How to add "reload" and IsLoading status to 2nd level Observable
我觉得我在重新发明轮子,所以我最好问一下。
给予
- 我有一个
Observable<T> source
- 和
Task LoadAsync<T>(T value)
方法
何时
我使用 Select/Switch 模式在源发出时调用 LoadMethod
observable
.Select(value => Observable.FromAsync(cancellationToken => LoadAsync(value, cancellationToken)))
.Switch()
.Subscribe();
然后
- 如何添加重新加载功能?
- 如何报告 IsLoading 状态:
LoadAsync
是否为 运行
- 如何在
source
完成时取消 LoadAsync
我想创建一些可重复使用的方法,或 class,以实现对 #1 和 #2 的回答。
到目前为止我有这个:https://dotnetfiddle.net/0zPhBE
public class ReactiveLoader<T> : IDisposable
{
private readonly BehaviorSubject<bool> _isLoading = new(false);
private readonly Subject<Unit> _completes = new();
private readonly Subject<T> _reloads = new Subject<T>();
private readonly IDisposable _subscription;
public bool IsLoading => _isLoading.Value;
public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice
public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
{
_subscription = observable
.Finally(() => //Not nice
{
_completes.OnNext(Unit.Default);
})
.Merge(_reloads)
.Do(_ => _isLoading.OnNext(true))
.Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
.Switch()
.Do(_ => _isLoading.OnNext(false))
.TakeUntil(_completes) //cancels loading when observable completes
.Subscribe();
}
public void Reload()
{
_reloads.OnNext(??); //needs last value of source
}
public void Dispose()
{
_completes.OnNext(Unit.Default);
_subscription.Dispose();
}
}
这是一种方法:
IObservable<bool> sequence = source.Publish(published => published
.CombineLatest(_reloads, (x, _) => x)
.Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
.Switch()
.Do(_isLoading)
.TakeUntil(published.LastOrDefaultAsync()));
每次 _reloads
发出信号时,CombineLatest
运算符将重新发出最新值。
.Select(_ => false).Prepend(true)
将内部可观察对象从 IObservable<Unit>
转换为发出加载状态信号的 IObservable<bool>
。
TakeUntil(published.LastOrDefaultAsync())
将在 source
终止时立即终止序列(无需等待任何挂起的 LoadAsync
操作)。
我觉得我在重新发明轮子,所以我最好问一下。
给予
- 我有一个
Observable<T> source
- 和
Task LoadAsync<T>(T value)
方法
何时
我使用 Select/Switch 模式在源发出时调用 LoadMethod
observable .Select(value => Observable.FromAsync(cancellationToken => LoadAsync(value, cancellationToken))) .Switch() .Subscribe();
然后
- 如何添加重新加载功能?
- 如何报告 IsLoading 状态:
LoadAsync
是否为 运行 - 如何在
source
完成时取消LoadAsync
我想创建一些可重复使用的方法,或 class,以实现对 #1 和 #2 的回答。
到目前为止我有这个:https://dotnetfiddle.net/0zPhBE
public class ReactiveLoader<T> : IDisposable
{
private readonly BehaviorSubject<bool> _isLoading = new(false);
private readonly Subject<Unit> _completes = new();
private readonly Subject<T> _reloads = new Subject<T>();
private readonly IDisposable _subscription;
public bool IsLoading => _isLoading.Value;
public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice
public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
{
_subscription = observable
.Finally(() => //Not nice
{
_completes.OnNext(Unit.Default);
})
.Merge(_reloads)
.Do(_ => _isLoading.OnNext(true))
.Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
.Switch()
.Do(_ => _isLoading.OnNext(false))
.TakeUntil(_completes) //cancels loading when observable completes
.Subscribe();
}
public void Reload()
{
_reloads.OnNext(??); //needs last value of source
}
public void Dispose()
{
_completes.OnNext(Unit.Default);
_subscription.Dispose();
}
}
这是一种方法:
IObservable<bool> sequence = source.Publish(published => published
.CombineLatest(_reloads, (x, _) => x)
.Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
.Switch()
.Do(_isLoading)
.TakeUntil(published.LastOrDefaultAsync()));
每次 _reloads
发出信号时,CombineLatest
运算符将重新发出最新值。
.Select(_ => false).Prepend(true)
将内部可观察对象从 IObservable<Unit>
转换为发出加载状态信号的 IObservable<bool>
。
TakeUntil(published.LastOrDefaultAsync())
将在 source
终止时立即终止序列(无需等待任何挂起的 LoadAsync
操作)。