如何使用 RX 实现超时序列?

How to achieve sequence of timeouts with RX?

场景如下:通信过来的设备如果在短时间内回调到服务器,则认为已连接。 我想创建一个 class 来封装跟踪此状态的功能。在调用设备时,应重置超时。回调时,连接确认,状态设置为true,如果回调超时,设置为false。但是下一次调用应该可以重新设置超时,无视当前状态。

我正在考虑使用 swithtimeout 通过 RX 实现此目的。但我不知道为什么它停止工作。

public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();

public bool IsConnected { get; private set; }

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds))) 
        .Switch()
        .Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}

public void ConfirmConnected()
{
    connected.OnNext(true);
}

public void SetPending()
{
    pending.OnNext(true);
}
}

这是"test case":

var c = new ConnectionStatus(default(CancellationToken));

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();   
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, OK

c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected(); 
c.IsConnected.Dump(); // FALSE, NOT OK!

我假设内部 Observable 的超时也停止了外部 Observable。由于不再调用 outer => lambda。正确的做法是什么?

谢谢

问题是 Timeout 本质上会导致异常破坏 Rx 订阅。触发超时后(如您所编码),将不会发送其他通知。 Rx 语法是你可以有 * OnNext 消息后跟一个 OnCompleted 或一个 OnError。来自 TimeoutOnError 发送后,您将看不到更多消息。

您需要通过 OnNext 消息而不是 OnError 消息传递超时消息。在您的旧代码中,您将任何 OnError 变为假,将任何 OnNext 变为真。相反,您需要将正确的新 IsConnected 值嵌入到 OnNext 消息中。方法如下:

public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
    pending.Select(_ => connected
            .Timeout(TimeSpan.FromSeconds(timeoutSeconds))
            .Materialize()
            .Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException) 
                ? Notification.CreateOnNext(false)
                : n)
            .Dematerialize()
            .Take(1)
        )
        .Switch()
        .Subscribe(b => IsConnected = b, token);
}

这里有另一种方法可以在不使用 .TimeOut 的情况下生成 IsConnected 值流:

public class ConnectionStatus
{
    private Subject<Unit> pending = new Subject<Unit>();
    private Subject<Unit> connected = new Subject<Unit>();

    public bool IsConnected { get; private set; }

    public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
    {
        pending
            .Select(outer =>
                Observable.Amb(
                    connected.Select(_ => true),
                    Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
            .Switch()
            .Subscribe(isConnected => IsConnected = isConnected, token);
    }

    public void ConfirmConnected()
    {
        connected.OnNext(Unit.Default);
    }

    public void SetPending()
    {
        pending.OnNext(Unit.Default);
    }
}

Observable.Amb 运算符只是从最先产生值的可观察对象中获取一个值 - 它比使用异常编码更可取。