Reactive Extensions 展开/扫描嵌套层次结构的方法
Reactive Extensions unfold / scan approach for nested hierarchy
我目前正在为一个应用程序构建一个向导系统,我们正在使用 ReactiveUI,结果是 Rx。
向导中的每个步骤都实现 IWizardStep<T>
,其中 T
只是向导最终生成的数据类型。
每个步骤都能够显示哪个步骤可供用户移动到下一个步骤,以便根据数据输入启用分支。
可以认为该步骤具有与以下类似的结构:
public interface IWizardStep<T>
{
IObservable<IStepChoice<T>> NextStepChoice {get;}
}
IStepChoice<T>
只是:
public interface IStepChoice<T>
{
IWizardStep<T> Step {get;}
string Reason {get;}
}
为了计算当前从开始到结束的路径,以便显示给用户,我需要能够从开始的步骤开始,并沿着 NextStepChoice
链递归地走,直到它命中 null(NextStepChoice
observable 发出 null 以指示向导结束是有效行为)。
我已经看过 Observable.Scan
,但我终其一生都想不出如何使它递归地正常工作。
我还查看了 Observable.Generate
,它看起来很有希望,因为这是一个 classic 展开式问题;唯一的问题是 Generate
需要一个函数来确定何时打破循环,但我需要评估内部可观察量来解决这个问题。
Observable.Generate(
new WizardStepChoice<T>(start, null),
choice => choice != null,
choice => choice.ChosenStep.NextStepChoice,
choice => choice);
这将是理想的,并产生我想要的输出,但那里的 NextStepChoice
选择器显然无法编译,因为它是 IObservable<IWizardStepChoice<T>>
而不是 IWizardStepChoice<T>
。
我考虑过使用 Aggregate
和 Scan
,但由于这些更多是折叠驱动的操作,而且我只有起始元素,所以它是我正在寻找的展开ala Generate
,但我需要它来评估嵌套的可观察对象。
Observable.Create
可能是我可以利用的东西吗?我试过了并想出了:
Path = CurrentStep.Select(_ => Observable.Create<IWizardStep<T>>(async observer =>
{
IWizardStepChoice<T> next = new WizardStepChoice<T>(start, null);
observer.OnNext(next.ChosenStep);
while (next != null)
{
next = await next.ChosenStep.NextStepChoice;
observer.OnNext(next.ChosenStep);
}
observer.OnCompleted();
return Disposable.Empty;
}).Aggregate(new List<IWizardStep<T>>(),
(l, s) =>
{
l.Add(s);
return l;
})).Switch().Publish().RefCount();
里面有我想要的所有正确的签名IWizardStep<T>->IReadOnlyList<IWizardStep<T>>
,所以乍一看似乎是正确的,但它不起作用;它会触发,我可以单步执行,但它一旦遇到等待就会挂起并且不会返回。
我感觉我很接近,这是一个日程安排问题,所以我的问题是:
- 解决这个问题的最佳方法是什么,我接近了吗?
- 如果这是正确的,为什么 await 有问题,我该如何解决?
更新
经过一些修补后,我注意到 await 可能挂起,因为那个 observable 还没有(也不会)发出一个值 (duh) ,我现在通过在向导开始时用一个值初始化每个步骤来解决这个问题。
我什至通过将 属性 添加到 IWizardStep<T>
- IWizardStepChoice<T> LatestStepChoice {get;}
来对此进行完整性检查,它刚刚连接到:
NextStepChoice.Subscribe(c => _latestStepChoice = c);
这是在步骤 class 本身完成的,我可以确认它工作得很好。
但是等待仍然挂起,所以我尝试了:
- 使其成为
Replay(1)
以便等待调用 .Subscribe()
将获得值 - 这没有用
- 实现
Repeat()
所以即使订阅了某些内容,它也会看到新值 - 这只会让整个事情挂起。
显然我在这里遗漏了一些东西,我想要它以便当 await 查询可观察对象时,它会被赋予看到的最新值,这正是我认为 Replay(1)
会实现的;我也尝试过 PublishLast()
但由于 AsyncSubject<T>
行为,以后的更新不会得到尊重。
现在我已经切换到使用自订阅 属性,但这并不理想,如果可以的话,我宁愿不必中断查询 observables,感觉 "hacky".
递归遍历可以将可观察对象树转换为单个可观察对象:
static IObservable<IWizardStep<T>> Walk<T>(IWizardStep<T> step)
{
if (step?.NextStepChoice == null)
return Observable.Return(step);
return step.NextStepChoice.SelectMany(choice => Walk(choice.Step)).StartWith(step);
}
用法:
var steps = await Walk(step).ToArray();
我目前正在为一个应用程序构建一个向导系统,我们正在使用 ReactiveUI,结果是 Rx。
向导中的每个步骤都实现 IWizardStep<T>
,其中 T
只是向导最终生成的数据类型。
每个步骤都能够显示哪个步骤可供用户移动到下一个步骤,以便根据数据输入启用分支。
可以认为该步骤具有与以下类似的结构:
public interface IWizardStep<T>
{
IObservable<IStepChoice<T>> NextStepChoice {get;}
}
IStepChoice<T>
只是:
public interface IStepChoice<T>
{
IWizardStep<T> Step {get;}
string Reason {get;}
}
为了计算当前从开始到结束的路径,以便显示给用户,我需要能够从开始的步骤开始,并沿着 NextStepChoice
链递归地走,直到它命中 null(NextStepChoice
observable 发出 null 以指示向导结束是有效行为)。
我已经看过 Observable.Scan
,但我终其一生都想不出如何使它递归地正常工作。
我还查看了 Observable.Generate
,它看起来很有希望,因为这是一个 classic 展开式问题;唯一的问题是 Generate
需要一个函数来确定何时打破循环,但我需要评估内部可观察量来解决这个问题。
Observable.Generate(
new WizardStepChoice<T>(start, null),
choice => choice != null,
choice => choice.ChosenStep.NextStepChoice,
choice => choice);
这将是理想的,并产生我想要的输出,但那里的 NextStepChoice
选择器显然无法编译,因为它是 IObservable<IWizardStepChoice<T>>
而不是 IWizardStepChoice<T>
。
我考虑过使用 Aggregate
和 Scan
,但由于这些更多是折叠驱动的操作,而且我只有起始元素,所以它是我正在寻找的展开ala Generate
,但我需要它来评估嵌套的可观察对象。
Observable.Create
可能是我可以利用的东西吗?我试过了并想出了:
Path = CurrentStep.Select(_ => Observable.Create<IWizardStep<T>>(async observer =>
{
IWizardStepChoice<T> next = new WizardStepChoice<T>(start, null);
observer.OnNext(next.ChosenStep);
while (next != null)
{
next = await next.ChosenStep.NextStepChoice;
observer.OnNext(next.ChosenStep);
}
observer.OnCompleted();
return Disposable.Empty;
}).Aggregate(new List<IWizardStep<T>>(),
(l, s) =>
{
l.Add(s);
return l;
})).Switch().Publish().RefCount();
里面有我想要的所有正确的签名IWizardStep<T>->IReadOnlyList<IWizardStep<T>>
,所以乍一看似乎是正确的,但它不起作用;它会触发,我可以单步执行,但它一旦遇到等待就会挂起并且不会返回。
我感觉我很接近,这是一个日程安排问题,所以我的问题是:
- 解决这个问题的最佳方法是什么,我接近了吗?
- 如果这是正确的,为什么 await 有问题,我该如何解决?
更新
经过一些修补后,我注意到 await 可能挂起,因为那个 observable 还没有(也不会)发出一个值 (duh) ,我现在通过在向导开始时用一个值初始化每个步骤来解决这个问题。
我什至通过将 属性 添加到 IWizardStep<T>
- IWizardStepChoice<T> LatestStepChoice {get;}
来对此进行完整性检查,它刚刚连接到:
NextStepChoice.Subscribe(c => _latestStepChoice = c);
这是在步骤 class 本身完成的,我可以确认它工作得很好。
但是等待仍然挂起,所以我尝试了:
- 使其成为
Replay(1)
以便等待调用.Subscribe()
将获得值 - 这没有用 - 实现
Repeat()
所以即使订阅了某些内容,它也会看到新值 - 这只会让整个事情挂起。
显然我在这里遗漏了一些东西,我想要它以便当 await 查询可观察对象时,它会被赋予看到的最新值,这正是我认为 Replay(1)
会实现的;我也尝试过 PublishLast()
但由于 AsyncSubject<T>
行为,以后的更新不会得到尊重。
现在我已经切换到使用自订阅 属性,但这并不理想,如果可以的话,我宁愿不必中断查询 observables,感觉 "hacky".
递归遍历可以将可观察对象树转换为单个可观察对象:
static IObservable<IWizardStep<T>> Walk<T>(IWizardStep<T> step)
{
if (step?.NextStepChoice == null)
return Observable.Return(step);
return step.NextStepChoice.SelectMany(choice => Walk(choice.Step)).StartWith(step);
}
用法:
var steps = await Walk(step).ToArray();