组合依赖于其他可观察量的可观察量

Combining observables that depend upon other observables

我正在使用可观测高度来模拟无人机的飞行。高度应根据此方案变化:

  1. 高度从0增加到BaseAltitude,即固定高度。
  2. 到达BaseAltitude后,无人机开始巡航,描述一个正弦波,从BaseAltitude
  3. 开始
  4. 收到信号后,无人机应开始着陆。即从当前高度开始,无人机应该直线下降,直到到达0.

您可能已经注意到,当着陆开始时,高度在设计时是未知的。起飞顺序应以最后一个高度为起点。因此,一个序列取决于另一个序列产生的最后一个值。我脑袋疼!

好吧,我完全被这个困住了。

我目前仅有的代码如下。我把它用来说明问题。你会很快得到它...

public class Drone
{
    public Drone()
    {
        var interval = TimeSpan.FromMilliseconds(200);

        var takeOff = Observable.Interval(interval).TakeWhile(h => h < BaseAltitude).Select(t => (double)t);

        var cruise = Observable
            .Interval(interval).Select(t => 100 * Math.Sin(t * 2 * Math.PI / 180) + BaseAltitude)
            .TakeUntil(_ => IsLanding);

        var landing = Observable
            .Interval(interval).Select(t => ??? );

        Altitude = takeOff.Concat(cruise).Concat(landing);
    }

    public bool IsLanding { get; set; }
    public double BaseAltitude { get; set; } = 100;
    public IObservable<double> Altitude { get; }
}

您使用 LastAsync 获取 cruise 的最后一个值,然后 SelectMany 进入您想要的可观察对象。

您需要稍微更改 cruise 以处理多个订阅。

    var cruise = Observable.Interval(interval)
        .Select(t => 100 * Math.Sin(t * 2 * Math.PI / 180) + BaseAltitude)
        .TakeUntil(_ => IsLanding)
        .Replay(1)
        .RefCount();

    var landing = cruise
        .LastAsync()
        .SelectMany(maxAlt => Observable.Interval(interval).Select(i => maxAlt - i))
        .TakeWhile(alt => alt >= 0);

    Altitude = takeOff.Concat(cruise).Concat(landing);

为什么我需要.Replay(1).Refcount()

这里的一切都是冷观察,其中 none 将 运行 并发。 Concat 实际上确保它们不是并发的。所以你想要的大理石图看起来像这样:

t        : 1-2-3-4-5-6-7-8-9-0-1-2-3-4-5-6-7-8-...
takeOff  : 1-2-3-4-5-|
cruise   :           6-7-8-7-6-|
isLanding: T-------------------F----------------
landing  :                     5-4-3-2-1-0-|

如果你定义了landing = cruise.LastAsync()...那么它会在时间11尝试订阅cruise并获取最后一个值。

  • 如果您保留 cruise 的定义,它会尝试重新订阅一个新的冷可观察对象,这将导致 0 个元素,因为 isLanding 现在是假的。
  • 如果您将 .Publish().RefCount() 添加到 cruise 定义中,它会尝试订阅上一个已完成的可观察对象,这也会导致 0 个元素。
  • .Replay(1).Refcount() 缓存最后一个值,因此在可观察对象完成后订阅的任何订阅者仍将获得最后一个值(这就是你想要的)。

您真的应该尝试制作 observable,以便它可以模拟选择起飞或降落在任何时候 - 就像无人机的用户可能做的那样。

如果您像这样编写代码,那将变得非常简单:

public class Drone
{
    public Drone()
    {
        this.Altitude = ...
    }

    private bool _isLanding = true;
    private Subject<bool> _isLandings = new Subject<bool>();

    public bool IsLanding
    {
        get => _isLanding;
        set
        {
            _isLanding = value;
            _isLandings.OnNext(value);
        }
    }

    public double BaseAltitude { get; set; } = 100.0;
    public IObservable<double> Altitude { get; }
}

每次 IsLanding 更改时,私有 _isLandings 都会触发一个可用于更改无人机模式的值。

现在,Altitude 的定义从这个基本模式开始:

    this.Altitude =
        _isLandings
            .Select(x => x ? landing : takeOff.Concat(cruise))
            .Switch()
            .StartWith(altitude);

这里使用.Switch()是关键。每当 _isLandings 产生一个值时,开关就会在着陆或起飞之间进行选择。它成为一个响应上升或下降的单一可观察对象。

完整代码如下所示:

public class Drone
{
    public Drone()
    {
        var altitude = 0.0;
        var interval = TimeSpan.FromMilliseconds(200);

        IObservable<double> landing =
            Observable
                .Interval(interval)
                .TakeWhile(h => altitude > 0.0)
                .Select(t =>
                {
                    altitude -= 10.0;
                    altitude = altitude > 0.0 ? altitude : 0.0;
                    return altitude;
                });

        IObservable<double> takeOff =
            Observable
                .Interval(interval)
                .TakeWhile(h => altitude < BaseAltitude)
                .Select(t =>
                {
                    altitude += 10.0;
                    altitude = altitude < BaseAltitude ? altitude : BaseAltitude;
                    return altitude;
                });

        IObservable<double> cruise =
            Observable
                .Interval(interval)
                .Select(t =>
                {
                    altitude = 10.0 * Math.Sin(t * 2.0 * Math.PI / 180.0) + BaseAltitude;
                    return altitude;
                });

        this.Altitude =
            _isLandings
                .Select(x => x ? landing : takeOff.Concat(cruise))
                .Switch()
                .StartWith(altitude);
    }

    private bool _isLanding = true;
    private Subject<bool> _isLandings = new Subject<bool>();

    public bool IsLanding
    {
        get => _isLanding;
        set
        {
            _isLanding = value;
            _isLandings.OnNext(value);
        }
    }

    public double BaseAltitude { get; set; } = 100.0;
    public IObservable<double> Altitude { get; }
}

你可以用这个来测试它:

var drone = new Drone();

drone.Altitude.Subscribe(x => Console.WriteLine(x));

Thread.Sleep(2000);

drone.IsLanding = false;

Thread.Sleep(4000);

drone.IsLanding = true;