Reactive Extensions 展开/扫描嵌套层次结构的方法

Reactive Extensions unfold / scan approach for nested hierarchy

我目前正在为一个应用程序构建一个向导系统,我们正在使用 ReactiveUI,结果是 Rx。

向导中的每个步骤都实现 IWizardStep<T>,其中 T 只是向导最终生成的数据类型。

每个步骤都能够显示哪个步骤可供用户移动到下一个步骤,以便根据数据输入启用分支。

可以认为该步骤具有与以下类似的结构:

public interface IWizardStep<T>
{
    IObservable<IStepChoice<T>> NextStepChoice {get;}
}

IStepChoice<T> 只是:

public interface IStepChoice<T>
{
    IWizardStep<T> Step {get;}
    string Reason {get;}
}

为了计算当前从开始到结束的路径,以便显示给用户,我需要能够从开始的步骤开始,并沿着 NextStepChoice 链递归地走,直到它命中 null(NextStepChoice observable 发出 null 以指示向导结束是有效行为)。

我已经看过 Observable.Scan,但我终其一生都想不出如何使它递归地正常工作。

我还查看了 Observable.Generate,它看起来很有希望,因为这是一个 classic 展开式问题;唯一的问题是 Generate 需要一个函数来确定何时打破循环,但我需要评估内部可观察量来解决这个问题。

Observable.Generate(
    new WizardStepChoice<T>(start, null),
    choice => choice != null,
    choice => choice.ChosenStep.NextStepChoice,
    choice => choice);

这将是理想的,并产生我想要的输出,但那里的 NextStepChoice 选择器显然无法编译,因为它是 IObservable<IWizardStepChoice<T>> 而不是 IWizardStepChoice<T>

我考虑过使用 AggregateScan,但由于这些更多是折叠驱动的操作,而且我只有起始元素,所以它是我正在寻找的展开ala Generate,但我需要它来评估嵌套的可观察对象。

Observable.Create 可能是我可以利用的东西吗?我试过了并想出了:

Path = CurrentStep.Select(_ => Observable.Create<IWizardStep<T>>(async observer =>
 {
     IWizardStepChoice<T> next = new WizardStepChoice<T>(start, null);
     observer.OnNext(next.ChosenStep);
     while (next != null)
     {
         next = await next.ChosenStep.NextStepChoice;
         observer.OnNext(next.ChosenStep);
     }
     observer.OnCompleted();
     return Disposable.Empty;
 }).Aggregate(new List<IWizardStep<T>>(),
 (l, s) =>
 {
     l.Add(s);
     return l;
 })).Switch().Publish().RefCount();

里面有我想要的所有正确的签名IWizardStep<T>->IReadOnlyList<IWizardStep<T>>,所以乍一看似乎是正确的,但它不起作用;它会触发,我可以单步执行,但它一旦遇到等待就会挂起并且不会返回。

我感觉我很接近,这是一个日程安排问题,所以我的问题是:

  1. 解决这个问题的最佳方法是什么,我接近了吗?
  2. 如果这是正确的,为什么 await 有问题,我该如何解决?

更新

经过一些修补后,我注意到 await 可能挂起,因为那个 observable 还没有(也不会)发出一个值 (duh) ,我现在通过在向导开始时用一个值初始化每个步骤来解决这个问题。

我什至通过将 属性 添加到 IWizardStep<T> - IWizardStepChoice<T> LatestStepChoice {get;} 来对此进行完整性检查,它刚刚连接到:

NextStepChoice.Subscribe(c => _latestStepChoice = c);

这是在步骤 class 本身完成的,我可以确认它工作得很好。

但是等待仍然挂起,所以我尝试了:

  1. 使其成为 Replay(1) 以便等待调用 .Subscribe() 将获得值 - 这没有用
  2. 实现 Repeat() 所以即使订阅了某些内容,它也会看到新值 - 这只会让整个事情挂起。

显然我在这里遗漏了一些东西,我想要它以便当 await 查询可观察对象时,它会被赋予看到的最新值,这正是我认为 Replay(1) 会实现的;我也尝试过 PublishLast() 但由于 AsyncSubject<T> 行为,以后的更新不会得到尊重。

现在我已经切换到使用自订阅 属性,但这并不理想,如果可以的话,我宁愿不必中断查询 observables,感觉 "hacky".

递归遍历可以将可观察对象树转换为单个可观察对象:

    static IObservable<IWizardStep<T>> Walk<T>(IWizardStep<T> step)
    {
        if (step?.NextStepChoice == null)
            return Observable.Return(step);

        return step.NextStepChoice.SelectMany(choice => Walk(choice.Step)).StartWith(step);
    }

用法:

var steps = await Walk(step).ToArray();