在派生属性中使用调度程序以获得响应式 UI 的合适方法是什么?
What's the appropriate way to use schedulers in derived properties to have a responsive UI?
我很难找到在我的 ViewModel 中安排长运行反应属性"getters"的正确方法。
This excerpt from Intro to RX 准确描述了我想做的事情:
- respond to some sort of user action
- do work on a background thread
- pass the result back to the UI thread
- update the UI
只有在这种情况下,除了用户交互之外,我想对其他属性的更改做出反应。
下面是我用来从原始 属性 获取派生 属性 的通用模板(在实际代码中,存在级联派生属性链)。
在 Reactive ViewModel(继承自 ReactiveObject)中,我已经拥有一些派生自其他属性的属性。例如,当Original
改变时,Derived
被重新计算。
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// ObserveOn? SubscribeOn? Which scheduler?
.Select(derivedValue => LongRunningCalculation(originalValue))
// Same thing here: ObserveOn? SubscribeOn? Which scheduler?
.ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?
我的问题是:我不知道应该如何组合这些不同的 "design choices" 以获得我想要的响应 UI:
- 使用哪个调度程序?我看过
RxApp.TaskpoolScheduler
、RxApp.MainThreadScheduler
、NewThreadScheduler.Default
以及其他可能的示例。
- 什么时候使用
SubscribeOn
与 ObserveOnDispatcher
的 ObserveOn
或 ToProperty
的 scheduler:
参数?
- 顺序有区别吗?我已经在
Select
运算符之前和之后放置了重新调度方法,但我不太确定。坦率地说,我什至不确定是否需要 Select
。
- 我看过一些将
Binding.IsAsync
设置为true
的例子,但我试过了,似乎没有太大区别,但同样,也许是因为其他因素。
- 这里
SynchronizationContext
和ThreadPriority
的概念相关吗?有没有办法在显示的代码中配置它们?
- 我应该使用
ReactiveCommand
还是其他 ReactiveUI class?
最伤脑筋的事实是,使用 一些 组合,计算正常,但阻止 UI,而使用其他一些组合,值是异步计算的,UI 稍微少一点阻塞,但有时派生值的 部分 不可用(例如,在项目集合中)!
抱歉,如果我要求太多,但我没有在文档中找到任何权威的预期方法来完成我需要的事情。
在可能阻塞 UI 的 Select
之前观察 TaskPoolScheduler。
在 ToProperty
之前观察 MainThreadScheduler。
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
.ObserveOn(TaskPoolScheduler.Default)
.Select(derivedValue => LongRunningCalculation(originalValue))
.ObserveOn(RxApp.MainThreadScheduler)
.ToProperty(this, x => x.Derived, out _derived);
另外
人们对 SubscribeOn
的实际作用感到非常困惑。有很多解释。例如,在此处的另一个答案中给出
SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler
这不是真的。查看 RX 代码库中 SubscribeOn
的实现很有启发意义。你必须跳过几层抽象才能到达那里,但最终你会找到。
public static IObservable<TSource>
SubscribeOn<TSource>
( IObservable<TSource> source
, IScheduler scheduler
)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
{
SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
SerialDisposable d = new SerialDisposable();
d.Disposable = (IDisposable) assignmentDisposable;
assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
return (IDisposable) d;
}));
}
唯一要做的就是确保在指定的调度程序上调用 source
上的 Subscribe
方法,并确保由相同 Subscribe
方法也会在指定的调度程序上调用。这对下游代码的影响是多种多样的。
例如
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace SubscribeOnVsObserveOn
{
class Program
{
static readonly Subject<object> EventsSubject = new Subject<object>();
private static readonly IObservable<object> Events = Observable.Create<object>
( observer =>
{
Info( "Subscribing" );
return EventsSubject.Subscribe( observer );
} );
public static void Info(string msg)
{
var currentThread = Thread.CurrentThread;
var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
Console.WriteLine
( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
}
public static void Foo()
{
Thread.CurrentThread.Name = "Main Thread";
Info( "Starting" );
void OnNext(object o) => Info( $"Received {o}" );
void Notify(object obj)
{
Info( $"Sending {obj}" );
EventsSubject.OnNext( obj );
}
void StartAndSend(object o, string threadName)
{
var thread = new Thread(Notify);
thread.Name = threadName;
thread.Start(o);
thread.Join();
}
Notify(1);
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe Only" );
Console.WriteLine("=============================================" );
using (Events.Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
}
static void Main(string[] args)
{
Foo();
Console.WriteLine( "Press Any Key" );
Console.ReadLine();
}
}
}
生成以下输出
Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key
要点是 SubscribeOn 既不能强制事件的发送或接收在特定的调度程序上进行。它只能强制 Subscribe
方法在特定的调度程序上发生。此可能或可能不具有下游/上游影响。
调度程序
在 Rx.NET 中有一些调度程序,包括 WPF 独有的特殊调度程序。
TaskPoolScheduler
运行s 任务池代码。这有点像 Task
. 中的 运行ning 代码
NewThreadScheduler
生成一个新线程以 运行 上的代码。一般不要使用这个运算符,除非你知道你 "need" 它(你几乎从不使用)
DispatcherScheduler
运行s 代码在 UI 线程上。当您要在 VM 中设置属性时使用它
RxUI 带来了两个与平台无关的调度程序抽象。无论您在什么平台上(WPF、UWP、Xamarin.iOS、Xamarin.Android),RxApp.MainThreadScheduler
将始终引用 UI 线程调度程序,而 RxApp.TaskPoolScheduler
将引用类似于后台线程的内容。
如果您想保持简单,只需使用 RxApp
调度程序; RxApp.MainThreadScheduler
用于 UI 东西,RxApp.TaskPoolScheduler
用于 background/heavy 值班东西。
ObserveOn/SubscribeOn
名称 SubscribeOn()
有点令人困惑,因为它不直接影响 Subscribe()
方法。 SubscribeOn()
决定 observable 将在哪个调度器上启动; original/first 订阅将在哪个调度器上完成(而不是 Subscribe()
方法将在哪个调度器上执行)。我喜欢认为 SubsribeOn()
将 observable 链向上移动到顶部,并确保 observable 在给定的调度程序上产生值。
一些运营商让您指定他们应该 运行 在哪个调度程序上。当他们这样做时,你应该总是更喜欢传递一个调度程序,这样你就知道他们将在哪里工作并防止他们潜在地阻塞 UI thead(尽管他们不应该)。 SubsribeOn()
是一种 "hack" 用于不允许您指定调度程序的可观察对象。如果您使用 SubscribeOn()
,但操作员指定了一个调度程序,则来自操作员的信号将在操作员调度程序上发出,而不是您在 SubscribeOn()
中指定的调度程序。
ObserveOn()
与 SubscribeOn()
的功能大致相同,但它确实做到了 "from this point onwards"。 ObserveOn()
之后的运算符和代码将在给定 ObserveOn()
的调度程序上执行。我喜欢认为 ObserveOn()
意味着 "change thread to this one".
做繁重的工作
如果您要进行繁重的工作,请将其放入一个函数中并调用该函数,就像您在 LongRunningCalculation()
中所做的那样。您可以在 Select()
之前使用 ObserveOn(RxApp.TaskPoolScheduler)
并在其之后使用 ObserveOn(RxApp.MainThreadScheduler
,但我更喜欢将 Observable.Start()
与 SelectMany()
.[=47= 结合使用]
Observable.Start()
基本上是 Observable.Return()
函数:"Give me the result of this function as an observable." 您还可以指定它应该调用函数的调度程序。
SelectMany()
确保我们得到可观察对象的结果,而不是可观察对象本身。 (这有点像 Observable 的 await
:"don't execute this next operator before we have the result of this observable")
派生属性
你做的派生 属性 正确。
使用 WhenAnyValue()
获取 属性 的更改并将其通过管道传输到 ToProperty()
。您放在中间的运算符可能会在后台线程上工作,从而延迟派生 属性 的设置,但这就是我们有 INotifyPropertyChanged
.
的原因
我的看法
以下是我将如何实施您的具体示例:
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
_derived = this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// Sepcify the scheduler to the operator directly
.SelectMany(originalValue =>
Observable.Start(
() => LongRunningCalculation(originalValue),
RxApp.TaskPoolScheduler))
.ObserveOn(RxApp.MainThreadScheduler)
// I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
.ToProperty(this, x => x.Derived);
我们有一个 Reactive 的 Slack 团队UI,欢迎您加入。您可以通过单击 here
请求邀请
我很难找到在我的 ViewModel 中安排长运行反应属性"getters"的正确方法。
This excerpt from Intro to RX 准确描述了我想做的事情:
- respond to some sort of user action
- do work on a background thread
- pass the result back to the UI thread
- update the UI
只有在这种情况下,除了用户交互之外,我想对其他属性的更改做出反应。
下面是我用来从原始 属性 获取派生 属性 的通用模板(在实际代码中,存在级联派生属性链)。
在 Reactive ViewModel(继承自 ReactiveObject)中,我已经拥有一些派生自其他属性的属性。例如,当Original
改变时,Derived
被重新计算。
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// ObserveOn? SubscribeOn? Which scheduler?
.Select(derivedValue => LongRunningCalculation(originalValue))
// Same thing here: ObserveOn? SubscribeOn? Which scheduler?
.ToProperty(this, x => x.Derived, out _derived); // should I use the `scheduler:` in this method?
我的问题是:我不知道应该如何组合这些不同的 "design choices" 以获得我想要的响应 UI:
- 使用哪个调度程序?我看过
RxApp.TaskpoolScheduler
、RxApp.MainThreadScheduler
、NewThreadScheduler.Default
以及其他可能的示例。 - 什么时候使用
SubscribeOn
与ObserveOnDispatcher
的ObserveOn
或ToProperty
的scheduler:
参数? - 顺序有区别吗?我已经在
Select
运算符之前和之后放置了重新调度方法,但我不太确定。坦率地说,我什至不确定是否需要Select
。 - 我看过一些将
Binding.IsAsync
设置为true
的例子,但我试过了,似乎没有太大区别,但同样,也许是因为其他因素。 - 这里
SynchronizationContext
和ThreadPriority
的概念相关吗?有没有办法在显示的代码中配置它们? - 我应该使用
ReactiveCommand
还是其他 ReactiveUI class?
最伤脑筋的事实是,使用 一些 组合,计算正常,但阻止 UI,而使用其他一些组合,值是异步计算的,UI 稍微少一点阻塞,但有时派生值的 部分 不可用(例如,在项目集合中)!
抱歉,如果我要求太多,但我没有在文档中找到任何权威的预期方法来完成我需要的事情。
在可能阻塞 UI 的 Select
之前观察 TaskPoolScheduler。
在 ToProperty
之前观察 MainThreadScheduler。
this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
.ObserveOn(TaskPoolScheduler.Default)
.Select(derivedValue => LongRunningCalculation(originalValue))
.ObserveOn(RxApp.MainThreadScheduler)
.ToProperty(this, x => x.Derived, out _derived);
另外
人们对 SubscribeOn
的实际作用感到非常困惑。有很多解释。例如,在此处的另一个答案中给出
SubscribeOn moves up the observable chain to the top and make sure the observable produces values on the given scheduler
这不是真的。查看 RX 代码库中 SubscribeOn
的实现很有启发意义。你必须跳过几层抽象才能到达那里,但最终你会找到。
public static IObservable<TSource>
SubscribeOn<TSource>
( IObservable<TSource> source
, IScheduler scheduler
)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return (IObservable<TSource>) new AnonymousObservable<TSource>((Func<IObserver<TSource>, IDisposable>) (observer =>
{
SingleAssignmentDisposable assignmentDisposable = new SingleAssignmentDisposable();
SerialDisposable d = new SerialDisposable();
d.Disposable = (IDisposable) assignmentDisposable;
assignmentDisposable.Disposable = scheduler.Schedule((Action) (() => d.Disposable = (IDisposable) new ScheduledDisposable(scheduler, source.SubscribeSafe<TSource>(observer))));
return (IDisposable) d;
}));
}
唯一要做的就是确保在指定的调度程序上调用 source
上的 Subscribe
方法,并确保由相同 Subscribe
方法也会在指定的调度程序上调用。这对下游代码的影响是多种多样的。
例如
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
namespace SubscribeOnVsObserveOn
{
class Program
{
static readonly Subject<object> EventsSubject = new Subject<object>();
private static readonly IObservable<object> Events = Observable.Create<object>
( observer =>
{
Info( "Subscribing" );
return EventsSubject.Subscribe( observer );
} );
public static void Info(string msg)
{
var currentThread = Thread.CurrentThread;
var currentThreadName = string.IsNullOrWhiteSpace( currentThread.Name ) ? "<no name>" : currentThread.Name;
Console.WriteLine
( $"Thread Id {currentThread.ManagedThreadId} {currentThreadName} - " + msg );
}
public static void Foo()
{
Thread.CurrentThread.Name = "Main Thread";
Info( "Starting" );
void OnNext(object o) => Info( $"Received {o}" );
void Notify(object obj)
{
Info( $"Sending {obj}" );
EventsSubject.OnNext( obj );
}
void StartAndSend(object o, string threadName)
{
var thread = new Thread(Notify);
thread.Name = threadName;
thread.Start(o);
thread.Join();
}
Notify(1);
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe Only" );
Console.WriteLine("=============================================" );
using (Events.Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(CurrentThreadScheduler)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( CurrentThreadScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(ThreadPool)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( ThreadPoolScheduler.Instance ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread)" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
Console.WriteLine("=============================================" );
Console.WriteLine("Subscribe With SubscribeOn(NewThread) + ObserveOn" );
Console.WriteLine("=============================================" );
using (Events.SubscribeOn( NewThreadScheduler.Default ).ObserveOn(TaskPoolScheduler.Default ).Subscribe(OnNext))
{
Thread.Sleep( 200 );
StartAndSend(2, "A");
StartAndSend(3, "B");
}
}
static void Main(string[] args)
{
Foo();
Console.WriteLine( "Press Any Key" );
Console.ReadLine();
}
}
}
生成以下输出
Thread Id 1 Main Thread - Starting
Thread Id 1 Main Thread - Sending 1
=============================================
Subscribe Only
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 4 A - Sending 2
Thread Id 4 A - Received 2
Thread Id 5 B - Sending 3
Thread Id 5 B - Received 3
=============================================
Subscribe With SubscribeOn(CurrentThreadScheduler)
=============================================
Thread Id 1 Main Thread - Subscribing
Thread Id 6 A - Sending 2
Thread Id 6 A - Received 2
Thread Id 7 B - Sending 3
Thread Id 7 B - Received 3
=============================================
Subscribe With SubscribeOn(ThreadPool)
=============================================
Thread Id 8 <no name> - Subscribing
Thread Id 10 A - Sending 2
Thread Id 10 A - Received 2
Thread Id 11 B - Sending 3
Thread Id 11 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread)
=============================================
Thread Id 12 <no name> - Subscribing
Thread Id 13 A - Sending 2
Thread Id 13 A - Received 2
Thread Id 14 B - Sending 3
Thread Id 14 B - Received 3
=============================================
Subscribe With SubscribeOn(NewThread) + ObserveOn
=============================================
Thread Id 16 <no name> - Subscribing
Thread Id 17 A - Sending 2
Thread Id 19 B - Sending 3
Thread Id 18 <no name> - Received 2
Thread Id 18 <no name> - Received 3
Press Any Key
要点是 SubscribeOn 既不能强制事件的发送或接收在特定的调度程序上进行。它只能强制 Subscribe
方法在特定的调度程序上发生。此可能或可能不具有下游/上游影响。
调度程序
在 Rx.NET 中有一些调度程序,包括 WPF 独有的特殊调度程序。
TaskPoolScheduler
运行s 任务池代码。这有点像Task
. 中的 运行ning 代码
NewThreadScheduler
生成一个新线程以 运行 上的代码。一般不要使用这个运算符,除非你知道你 "need" 它(你几乎从不使用)DispatcherScheduler
运行s 代码在 UI 线程上。当您要在 VM 中设置属性时使用它
RxUI 带来了两个与平台无关的调度程序抽象。无论您在什么平台上(WPF、UWP、Xamarin.iOS、Xamarin.Android),RxApp.MainThreadScheduler
将始终引用 UI 线程调度程序,而 RxApp.TaskPoolScheduler
将引用类似于后台线程的内容。
如果您想保持简单,只需使用 RxApp
调度程序; RxApp.MainThreadScheduler
用于 UI 东西,RxApp.TaskPoolScheduler
用于 background/heavy 值班东西。
ObserveOn/SubscribeOn
名称 SubscribeOn()
有点令人困惑,因为它不直接影响 Subscribe()
方法。 SubscribeOn()
决定 observable 将在哪个调度器上启动; original/first 订阅将在哪个调度器上完成(而不是 Subscribe()
方法将在哪个调度器上执行)。我喜欢认为 SubsribeOn()
将 observable 链向上移动到顶部,并确保 observable 在给定的调度程序上产生值。
一些运营商让您指定他们应该 运行 在哪个调度程序上。当他们这样做时,你应该总是更喜欢传递一个调度程序,这样你就知道他们将在哪里工作并防止他们潜在地阻塞 UI thead(尽管他们不应该)。 SubsribeOn()
是一种 "hack" 用于不允许您指定调度程序的可观察对象。如果您使用 SubscribeOn()
,但操作员指定了一个调度程序,则来自操作员的信号将在操作员调度程序上发出,而不是您在 SubscribeOn()
中指定的调度程序。
ObserveOn()
与 SubscribeOn()
的功能大致相同,但它确实做到了 "from this point onwards"。 ObserveOn()
之后的运算符和代码将在给定 ObserveOn()
的调度程序上执行。我喜欢认为 ObserveOn()
意味着 "change thread to this one".
做繁重的工作
如果您要进行繁重的工作,请将其放入一个函数中并调用该函数,就像您在 LongRunningCalculation()
中所做的那样。您可以在 Select()
之前使用 ObserveOn(RxApp.TaskPoolScheduler)
并在其之后使用 ObserveOn(RxApp.MainThreadScheduler
,但我更喜欢将 Observable.Start()
与 SelectMany()
.[=47= 结合使用]
Observable.Start()
基本上是 Observable.Return()
函数:"Give me the result of this function as an observable." 您还可以指定它应该调用函数的调度程序。
SelectMany()
确保我们得到可观察对象的结果,而不是可观察对象本身。 (这有点像 Observable 的 await
:"don't execute this next operator before we have the result of this observable")
派生属性
你做的派生 属性 正确。
使用 WhenAnyValue()
获取 属性 的更改并将其通过管道传输到 ToProperty()
。您放在中间的运算符可能会在后台线程上工作,从而延迟派生 属性 的设置,但这就是我们有 INotifyPropertyChanged
.
我的看法
以下是我将如何实施您的具体示例:
public TOriginal Original
{
get { return _original; }
set { this.RaiseAndSetIfChanged(ref _original, value); }
}
TOriginal _original;
public TDerived Derived { get { return _derived.Value; } }
readonly ObservableAsPropertyHelper<double[,]> _derived;
_derived = this.WhenAnyValue(x => x.Original)
.Where(originalValue => originalValue != null)
// Sepcify the scheduler to the operator directly
.SelectMany(originalValue =>
Observable.Start(
() => LongRunningCalculation(originalValue),
RxApp.TaskPoolScheduler))
.ObserveOn(RxApp.MainThreadScheduler)
// I prefer this overload of ToProperty, which returns an ObservableAsPropertyHelper
.ToProperty(this, x => x.Derived);
我们有一个 Reactive 的 Slack 团队UI,欢迎您加入。您可以通过单击 here
请求邀请