并行执行 OnNext 但与 UI 线程同步订阅
Execute OnNext in parallel but sync subscription with UI thread
给出这样的主题:
var input = new Subject<int>();
订阅者是这样的:
var observer1 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
我怎样才能调整它以便当我这样做时
Enumerable.Range(0, 1000).ToList().ForEach((i) =>
{
input.OnNext(i);
});
OnNext 称为异步,但订阅已同步到 UI 线程。 (是WinForms环境)
我试过了
var observer1 = input
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
而且由于 ObserveOn,我可以看到 observer1 和 observer2 无需等待就立即开始他们的订阅操作。但是现在由于在非 ui 线程上调用列表框,应用程序崩溃了。一旦我将订阅的操作与 UI 步同步,我就会松开 OnNext 调用的并行处理。
知道如何解决吗?
刚发现如果我这样做
var observer1 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
有效。但是没有一个更干净的、基于 Rx 的解决方案吗?
基本上我想在订阅中执行一个可以访问 UI 线程的即发即弃操作。
您需要将检索项目的工作与更新 UI 的工作分开。 运行 在后台线程上工作,然后将列表传递给 UI 以更新 ListBox。
试试这个:
input.ObserveOn(Scheduler.Default) // transition to background thread
.Select(evt => /* synchronous code returning item */)
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
ObserveOn
有许多重载,使用 rx-winforms 你可以传递一个 WinForms 控件(它可以是一个主机表单或一个组成部分)并且 Rx 将使用它来转换到 UI 线程。另一个重载接受 SynchronzationContext
个对象。
根据您拥有的项目的数量和频率,您可能需要考虑在 Select
之前或之后使用 Buffer
和 TimeSpan
将项目分批放入List
然后一次用多个项目更新 UI。这减少了上下文切换的次数,并且可以使 UI 更具响应性。
您可能会发现参考此内容很有帮助:ObserveOn and SubscribeOn - where the work is being done
请注意,这将一次处理每一项 - 这可能不是您想要的。在这种情况下,您可以使用 SelectMany
。假设您的后台工作封装在异步方法中,例如:
async Task<string> GetItem(int evt);
那么你可以这样做:
input.SelectMany(evt => GetItem(evt))
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
因为 GetItem
现在是异步的,它已经转换到后台线程 - 只要这发生得很快(即你应该在你的 GetItem
中有一个 await
第一个 long-运行 动作),不需要第一个 ObserveOn
.
SelectMany
将每个源事件投射到一个可观察流中,并将结果展平。它有一个接受异步方法的重载,并将其封装在 IObservable<T>
中 returns 单个结果并完成。
这意味着事件是并发处理的,现在事件可能会乱序返回 - 所以请确保这是可以接受的。有办法解决这个问题,但它确实超出了问题的范围。 (提示:Select
+Concat
)
给出这样的主题:
var input = new Subject<int>();
订阅者是这样的:
var observer1 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
我怎样才能调整它以便当我这样做时
Enumerable.Range(0, 1000).ToList().ForEach((i) =>
{
input.OnNext(i);
});
OnNext 称为异步,但订阅已同步到 UI 线程。 (是WinForms环境)
我试过了
var observer1 = input
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(ev =>
{
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
而且由于 ObserveOn,我可以看到 observer1 和 observer2 无需等待就立即开始他们的订阅操作。但是现在由于在非 ui 线程上调用列表框,应用程序崩溃了。一旦我将订阅的操作与 UI 步同步,我就会松开 OnNext 调用的并行处理。
知道如何解决吗?
刚发现如果我这样做
var observer1 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o1: " + ev.ToString());
});
var observer2 = input
.Subscribe(ev =>
{
await Task.Yield();
Thread.Sleep(1000);
listBox.Items.Add("o2: " + ev.ToString());
});
有效。但是没有一个更干净的、基于 Rx 的解决方案吗? 基本上我想在订阅中执行一个可以访问 UI 线程的即发即弃操作。
您需要将检索项目的工作与更新 UI 的工作分开。 运行 在后台线程上工作,然后将列表传递给 UI 以更新 ListBox。
试试这个:
input.ObserveOn(Scheduler.Default) // transition to background thread
.Select(evt => /* synchronous code returning item */)
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
ObserveOn
有许多重载,使用 rx-winforms 你可以传递一个 WinForms 控件(它可以是一个主机表单或一个组成部分)并且 Rx 将使用它来转换到 UI 线程。另一个重载接受 SynchronzationContext
个对象。
根据您拥有的项目的数量和频率,您可能需要考虑在 Select
之前或之后使用 Buffer
和 TimeSpan
将项目分批放入List
然后一次用多个项目更新 UI。这减少了上下文切换的次数,并且可以使 UI 更具响应性。
您可能会发现参考此内容很有帮助:ObserveOn and SubscribeOn - where the work is being done
请注意,这将一次处理每一项 - 这可能不是您想要的。在这种情况下,您可以使用 SelectMany
。假设您的后台工作封装在异步方法中,例如:
async Task<string> GetItem(int evt);
那么你可以这样做:
input.SelectMany(evt => GetItem(evt))
.ObserveOn(frm) // use rx-winforms to get this extension method
.Subscribe(item => listBox.Items.Add(item));
因为 GetItem
现在是异步的,它已经转换到后台线程 - 只要这发生得很快(即你应该在你的 GetItem
中有一个 await
第一个 long-运行 动作),不需要第一个 ObserveOn
.
SelectMany
将每个源事件投射到一个可观察流中,并将结果展平。它有一个接受异步方法的重载,并将其封装在 IObservable<T>
中 returns 单个结果并完成。
这意味着事件是并发处理的,现在事件可能会乱序返回 - 所以请确保这是可以接受的。有办法解决这个问题,但它确实超出了问题的范围。 (提示:Select
+Concat
)