在派生属性中使用调度程序以获得响应式 UI 的合适方法是什么?

What's the appropriate way to use schedulers in derived properties to have a responsive UI?

我很难找到在我的 ViewModel 中安排长运行反应属性"getters"的正确方法。

This excerpt from Intro to RX 准确描述了我想做的事情:

  • respond to some sort of user action
  • do work on a background thread
  • pass the result back to the UI thread
  • update the UI

只有在这种情况下,除了用户交互之外,我想对其他属性的更改做出反应。

下面是我用来从原始 属性 获取派生 属性 的通用模板(在实际代码中,存在级联派生属性链)。

在 Reactive ViewModel(继承自 ReactiveObject)中,我已经拥有一些派生自其他属性的属性。例如,当Original改变时,Derived被重新计算。

    public TOriginal Original
    {
        get { return _original; }
        set { this.RaiseAndSetIfChanged(ref _original, value); }
    }
    TOriginal _original;


    public TDerived Derived { get { return _derived.Value; } }
    readonly ObservableAsPropertyHelper<double[,]> _derived;


    this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        // ObserveOn? SubscribeOn? Which scheduler?
        .Select(derivedValue => LongRunningCalculation(originalValue))
        // Same thing here: ObserveOn? SubscribeOn? Which scheduler? 
        .ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?

我的问题是:我不知道应该如何组合这些不同的 "design choices" 以获得我想要的响应 UI:

最伤脑筋的事实是,使用 一些 组合,计算正常,但阻止 UI,而使用其他一些组合,值是异步计算的,UI 稍微少一点阻塞,但有时派生值的 部分 不可用(例如,在项目集合中)!

抱歉,如果我要求太多,但我没有在文档中找到任何权威的预期方法来完成我需要的事情。

在可能阻塞 UI 的 Select 之前观察 TaskPoolScheduler。 在 ToProperty 之前观察 MainThreadScheduler。

  this.WhenAnyValue(x => x.Original)
        .Where(originalValue => originalValue != null)
        .ObserveOn(TaskPoolScheduler.Default)
        .Select(derivedValue => LongRunningCalculation(originalValue))
        .ObserveOn(RxApp.MainThreadScheduler)
        .ToProperty(this, x => x.Derived, out _derived); 

另外

人们对 SubscribeOn 的实际作用感到非常困惑。有很多解释。例如,在此处的另一个答案中给出

SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler

这不是真的。查看 RX 代码库中 SubscribeOn 的实现很有启发意义。你必须跳过几层抽象才能到达那里,但最终你会找到。

public static IObservable<TSource> 
    SubscribeOn<TSource>
   ( IObservable<TSource> source
   , IScheduler scheduler
   )
{
  if (source == null)
    throw new ArgumentNullException("source");
  if (scheduler == null)
    throw new ArgumentNullException("scheduler");
  return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
  {
    SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
    SerialDisposable d = new SerialDisposable();
    d.Disposable = (IDisposable) assignmentDisposable;
    assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
    return (IDisposable) d;
  }));
}

唯一要做的就是确保在指定的调度程序上调用 source 上的 Subscribe 方法,并确保由相同 Subscribe 方法也会在指定的调度程序上调用。这对下游代码的影响是多种多样的。

例如

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace SubscribeOnVsObserveOn
{
    class Program
    {
        static readonly Subject<object> EventsSubject = new Subject<object>();

        private static readonly IObservable<object> Events = Observable.Create<object>
            ( observer =>
            {
                Info( "Subscribing"  );
                return EventsSubject.Subscribe( observer );
            } );

        public static void Info(string msg)
        {
            var currentThread = Thread.CurrentThread;
            var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
            Console.WriteLine
                ( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
        }

        public static void  Foo()
        {
            Thread.CurrentThread.Name = "Main Thread";

            Info( "Starting"  );

            void OnNext(object o) => Info( $"Received {o}" );

            void Notify(object obj)
            {
                Info( $"Sending {obj}"  );
                EventsSubject.OnNext( obj );
            }

            void StartAndSend(object o, string threadName)
            {
                var thread = new Thread(Notify);
                thread.Name = threadName;
                thread.Start(o);
                thread.Join();
            }

            Notify(1);

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe Only" );
            Console.WriteLine("=============================================" );
            using (Events.Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }

            Console.WriteLine("=============================================" );
            Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
            Console.WriteLine("=============================================" );
            using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default  ).Subscribe(OnNext))
            {
                Thread.Sleep( 200 );
                StartAndSend(2, "A");
                StartAndSend(3, "B");
            }
        }




        static void Main(string[] args)
        {
            Foo();
            Console.WriteLine( "Press Any Key" );
            Console.ReadLine();
        }
    }
}

生成以下输出

Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key

要点是 SubscribeOn 既不能强制事件的发送或接收在特定的调度程序上进行。它只能强制 Subscribe 方法在特定的调度程序上发生。此可能可能不具有下游/上游影响。

调度程序

在 Rx.NET 中有一些调度程序,包括 WPF 独有的特殊调度程序。

  • TaskPoolScheduler 运行s 任务池代码。这有点像 Task.
  • 中的 运行ning 代码
  • NewThreadScheduler 生成一个新线程以 运行 上的代码。一般不要使用这个运算符,除非你知道你 "need" 它(你几乎从不使用)
  • DispatcherScheduler 运行s 代码在 UI 线程上。当您要在 VM
  • 中设置属性时使用它

RxUI 带来了两个与平台无关的调度程序抽象。无论您在什么平台上(WPF、UWP、Xamarin.iOS、Xamarin.Android),RxApp.MainThreadScheduler 将始终引用 UI 线程调度程序,而 RxApp.TaskPoolScheduler 将引用类似于后台线程的内容。

如果您想保持简单,只需使用 RxApp 调度程序; RxApp.MainThreadScheduler 用于 UI 东西,RxApp.TaskPoolScheduler 用于 background/heavy 值班东西。

ObserveOn/SubscribeOn

名称 SubscribeOn() 有点令人困惑,因为它不直接影响 Subscribe() 方法。 SubscribeOn() 决定 observable 将在哪个调度器上启动; original/first 订阅将在哪个调度器上完成(而不是 Subscribe() 方法将在哪个调度器上执行)。我喜欢认为 SubsribeOn() 将 observable 链向上移动到顶部,并确保 observable 在给定的调度程序上产生值。

一些运营商让您指定他们应该 运行 在哪个调度程序上。当他们这样做时,你应该总是更喜欢传递一个调度程序,这样你就知道他们将在哪里工作并防止他们潜在地阻塞 UI thead(尽管他们不应该)。 SubsribeOn() 是一种 "hack" 用于不允许您指定调度程序的可观察对象。如果您使用 SubscribeOn(),但操作员指定了一个调度程序,则来自操作员的信号将在操作员调度程序上发出,而不是您在 SubscribeOn() 中指定的调度程序。

ObserveOn()SubscribeOn() 的功能大致相同,但它确实做到了 "from this point onwards"。 ObserveOn() 之后的运算符和代码将在给定 ObserveOn() 的调度程序上执行。我喜欢认为 ObserveOn() 意味着 "change thread to this one".

做繁重的工作

如果您要进行繁重的工作,请将其放入一个函数中并调用该函数,就像您在 LongRunningCalculation() 中所做的那样。您可以在 Select() 之前使用 ObserveOn(RxApp.TaskPoolScheduler) 并在其之后使用 ObserveOn(RxApp.MainThreadScheduler,但我更喜欢将 Observable.Start()SelectMany().[=47= 结合使用]

Observable.Start() 基本上是 Observable.Return() 函数:"Give me the result of this function as an observable." 您还可以指定它应该调用函数的调度程序。

SelectMany() 确保我们得到可观察对象的结果,而不是可观察对象本身。 (这有点像 Observable 的 await:"don't execute this next operator before we have the result of this observable")

派生属性

你做的派生 属性 正确。

使用 WhenAnyValue() 获取 属性 的更改并将其通过管道传输到 ToProperty()。您放在中间的运算符可能会在后台线程上工作,从而延迟派生 属性 的设置,但这就是我们有 INotifyPropertyChanged.

的原因

我的看法

以下是我将如何实施您的具体示例:

public TOriginal Original
{
    get { return _original; }
    set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;


public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;


_derived = this.WhenAnyValue(x => x.Original)
    .Where(originalValue => originalValue != null)
    // Sepcify the scheduler to the operator directly
    .SelectMany(originalValue =>
        Observable.Start(
            () => LongRunningCalculation(originalValue),
            RxApp.TaskPoolScheduler))
    .ObserveOn(RxApp.MainThreadScheduler)
    // I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
    .ToProperty(this, x => x.Derived);

我们有一个 Reactive 的 Slack 团队UI,欢迎您加入。您可以通过单击 here

请求邀请