如何使用 RX 实现超时序列?
How to achieve sequence of timeouts with RX?
场景如下:通信过来的设备如果在短时间内回调到服务器,则认为已连接。
我想创建一个 class 来封装跟踪此状态的功能。在调用设备时,应重置超时。回调时,连接确认,状态设置为true,如果回调超时,设置为false。但是下一次调用应该可以重新设置超时,无视当前状态。
我正在考虑使用 swith
和 timeout
通过 RX 实现此目的。但我不知道为什么它停止工作。
public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds)))
.Switch()
.Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}
public void ConfirmConnected()
{
connected.OnNext(true);
}
public void SetPending()
{
pending.OnNext(true);
}
}
这是"test case":
var c = new ConnectionStatus(default(CancellationToken));
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, NOT OK!
我假设内部 Observable 的超时也停止了外部 Observable。由于不再调用 outer =>
lambda。正确的做法是什么?
谢谢
问题是 Timeout
本质上会导致异常破坏 Rx 订阅。触发超时后(如您所编码),将不会发送其他通知。 Rx 语法是你可以有 * OnNext
消息后跟一个 OnCompleted
或一个 OnError
。来自 Timeout
的 OnError
发送后,您将看不到更多消息。
您需要通过 OnNext
消息而不是 OnError
消息传递超时消息。在您的旧代码中,您将任何 OnError
变为假,将任何 OnNext
变为真。相反,您需要将正确的新 IsConnected
值嵌入到 OnNext
消息中。方法如下:
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(_ => connected
.Timeout(TimeSpan.FromSeconds(timeoutSeconds))
.Materialize()
.Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException)
? Notification.CreateOnNext(false)
: n)
.Dematerialize()
.Take(1)
)
.Switch()
.Subscribe(b => IsConnected = b, token);
}
这里有另一种方法可以在不使用 .TimeOut
的情况下生成 IsConnected
值流:
public class ConnectionStatus
{
private Subject<Unit> pending = new Subject<Unit>();
private Subject<Unit> connected = new Subject<Unit>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending
.Select(outer =>
Observable.Amb(
connected.Select(_ => true),
Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
.Switch()
.Subscribe(isConnected => IsConnected = isConnected, token);
}
public void ConfirmConnected()
{
connected.OnNext(Unit.Default);
}
public void SetPending()
{
pending.OnNext(Unit.Default);
}
}
Observable.Amb
运算符只是从最先产生值的可观察对象中获取一个值 - 它比使用异常编码更可取。
场景如下:通信过来的设备如果在短时间内回调到服务器,则认为已连接。 我想创建一个 class 来封装跟踪此状态的功能。在调用设备时,应重置超时。回调时,连接确认,状态设置为true,如果回调超时,设置为false。但是下一次调用应该可以重新设置超时,无视当前状态。
我正在考虑使用 swith
和 timeout
通过 RX 实现此目的。但我不知道为什么它停止工作。
public class ConnectionStatus
{
private Subject<bool> pending = new Subject<bool>();
private Subject<bool> connected = new Subject<bool>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(outer => connected.Timeout(TimeSpan.FromSeconds(timeoutSeconds)))
.Switch()
.Subscribe(_ => IsConnected = true, e => IsConnected = false, token);
}
public void ConfirmConnected()
{
connected.OnNext(true);
}
public void SetPending()
{
pending.OnNext(true);
}
}
这是"test case":
var c = new ConnectionStatus(default(CancellationToken));
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(5));
c.ConfirmConnected();
c.IsConnected.Dump(); // TRUE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(20));
c.IsConnected.Dump(); // FALSE, OK
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, OK
c.SetPending();
await Task.Delay(TimeSpan.FromSeconds(10));
c.ConfirmConnected();
c.IsConnected.Dump(); // FALSE, NOT OK!
我假设内部 Observable 的超时也停止了外部 Observable。由于不再调用 outer =>
lambda。正确的做法是什么?
谢谢
问题是 Timeout
本质上会导致异常破坏 Rx 订阅。触发超时后(如您所编码),将不会发送其他通知。 Rx 语法是你可以有 * OnNext
消息后跟一个 OnCompleted
或一个 OnError
。来自 Timeout
的 OnError
发送后,您将看不到更多消息。
您需要通过 OnNext
消息而不是 OnError
消息传递超时消息。在您的旧代码中,您将任何 OnError
变为假,将任何 OnNext
变为真。相反,您需要将正确的新 IsConnected
值嵌入到 OnNext
消息中。方法如下:
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending.Select(_ => connected
.Timeout(TimeSpan.FromSeconds(timeoutSeconds))
.Materialize()
.Select(n => n.Kind == NotificationKind.OnError && n.Exception.GetType() == typeof(TimeoutException)
? Notification.CreateOnNext(false)
: n)
.Dematerialize()
.Take(1)
)
.Switch()
.Subscribe(b => IsConnected = b, token);
}
这里有另一种方法可以在不使用 .TimeOut
的情况下生成 IsConnected
值流:
public class ConnectionStatus
{
private Subject<Unit> pending = new Subject<Unit>();
private Subject<Unit> connected = new Subject<Unit>();
public bool IsConnected { get; private set; }
public ConnectionStatus(CancellationToken token, short timeoutSeconds = 15)
{
pending
.Select(outer =>
Observable.Amb(
connected.Select(_ => true),
Observable.Timer(TimeSpan.FromSeconds(timeoutSeconds)).Select(_ => false)))
.Switch()
.Subscribe(isConnected => IsConnected = isConnected, token);
}
public void ConfirmConnected()
{
connected.OnNext(Unit.Default);
}
public void SetPending()
{
pending.OnNext(Unit.Default);
}
}
Observable.Amb
运算符只是从最先产生值的可观察对象中获取一个值 - 它比使用异常编码更可取。