如何将 "reload" 和 IsLoading 状态添加到二级 Observable

How to add "reload" and IsLoading status to 2nd level Observable

我觉得我在重新发明轮子,所以我最好问一下。

给予

何时

然后

  1. 如何添加重新加载功能?
  2. 如何报告 IsLoading 状态:LoadAsync 是否为 运行
  3. 如何在 source 完成时取消 LoadAsync

我想创建一些可重复使用的方法,或 class,以实现对 #1 和 #2 的回答。

到目前为止我有这个:https://dotnetfiddle.net/0zPhBE

public class ReactiveLoader<T> : IDisposable
{
    private readonly BehaviorSubject<bool> _isLoading = new(false);
    private readonly Subject<Unit> _completes = new();
    private readonly Subject<T> _reloads = new Subject<T>();
    private readonly IDisposable _subscription;

    public bool IsLoading => _isLoading.Value;
    public IObservable<bool> IsLoadingObservable => _isLoading.Skip(1).DistinctUntilChanged(); //Not nice

    public ReactiveLoader(IObservable<T> observable, Func<T, CancellationToken, Task> handler)
    {           
        _subscription = observable
            .Finally(() => //Not nice
            {
                 _completes.OnNext(Unit.Default);
            })
            .Merge(_reloads)
            .Do(_ => _isLoading.OnNext(true))
            .Select(value => Observable.FromAsync(cancellationToken => handler(value, cancellationToken)))
            .Switch()
            .Do(_ => _isLoading.OnNext(false))
            .TakeUntil(_completes) //cancels loading when observable completes
            .Subscribe();
    }

    public void Reload()
    {
         _reloads.OnNext(??); //needs last value of source
    }

    public void Dispose()
    {
        _completes.OnNext(Unit.Default);
        _subscription.Dispose();
    }
}

这是一种方法:

IObservable<bool> sequence = source.Publish(published => published
    .CombineLatest(_reloads, (x, _) => x)
    .Select(x => Observable.FromAsync(ct => LoadAsync(x, ct)).Select(_ => false).Prepend(true))
    .Switch()
    .Do(_isLoading)
    .TakeUntil(published.LastOrDefaultAsync()));

每次 _reloads 发出信号时,CombineLatest 运算符将重新发出最新值。

.Select(_ => false).Prepend(true) 将内部可观察对象从 IObservable<Unit> 转换为发出加载状态信号的 IObservable<bool>

TakeUntil(published.LastOrDefaultAsync()) 将在 source 终止时立即终止序列(无需等待任何挂起的 LoadAsync 操作)。