Rx.net 在 observable 的 disconnect/error 上实现重试功能
Rx.net implement retry functionality on disconnect/error in observable
下面是以下代码:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetObservable()
.Subscribe(observer);
}
}
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
}
public class ClientConsumingProgram
{
class FooObserver : IObserver<FooData>
{
public void OnNext(FooData value)
{
//Client consuming without interruption
}
//.. on error.. onCompleted
}
public static void Main()
{
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
}
}
我想执行以下操作:
当传输服务断开连接时(从服务器关闭套接字),我希望应用程序重试几次,但 foo 服务首先需要在 _transportService
上调用 Connect
,然后 [=18] =] 已连接,调用 GetObservable.
如果 _transportService
在最大重试之前再次连接,FooObserver
上的 OnNext
会在客户端保持滴答,并且一旦超过最大重试次数,就应该触发错误 OnError。
有人可以为我指出正确的实施方向吗?
更新
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
public interface ITransportService
{
IObservable<ConnectionState> GetConnectionStateObservable();
bool Connect();
IObservable<FooData> GetObservable();
}
public class FooData
{
public int Id { get; set; }
public string Msg { get; set; }
}
public enum ConnectionState
{
Open,
Close
}
public class FooMockTransportService : ITransportService
{
public ConnectionState State { get; set; }
private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
private bool _shouldDisconnect;
public FooMockTransportService()
{
_shouldDisconnect = true;
}
public bool Connect()
{
State = ConnectionState.Open;
_connectionSubject.OnNext(ConnectionState.Open);
return true;
}
public IObservable<ConnectionState> GetConnectionStateObservable()
{
return _connectionSubject.AsObservable();
}
public IObservable<FooData> GetObservable()
{
return Observable.Create<FooData>(
o=>
{
TaskPoolScheduler.Default.Schedule(() =>
{
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
//Simulate disconnection, ony once
if(_shouldDisconnect)
{
_shouldDisconnect = false;
State = ConnectionState.Close;
o.OnError(new Exception("Disconnected"));
_connectionSubject.OnNext(ConnectionState.Close);
}
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
});
return () => { };
});
}
}
public class Program
{
class FooObserver : IObserver<FooData>
{
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(FooData value)
{
Console.WriteLine(value.Id);
}
}
public static void Main()
{
var transportService = new FooMockTransportService();
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
Console.Read();
}
}
代码是兼容的,还包含对 Shlomo 的建议。
当前输出:
1
2
System.Exception: Disconnected
期望的输出,在断开连接时它应该捕获并每 1 秒重试一次以查看它是否已连接:
1
2
1
2
3
4
如果你控制ITransportService
,我会建议添加一个属性:
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}
一旦您能够以可观察的方式获取状态,生成可观察的对象就会变得更加容易:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
如果你不控制ITransportService
,我建议创建一个继承自它的接口,你可以在其中添加类似的属性。
顺便说一句,我建议您放弃 FooObserver
,您几乎不需要塑造自己的观察者。公开 Observable,然后在 Observable 上调用 Subscribe
重载通常可以达到目的。
虽然我无法测试任何这些:重试逻辑应该是什么样子,Connect
的 return 值是什么意思,或者 ConnectionState
class 是,代码无法编译。您应该尝试将问题设计为 mcve.
更新:
下面按预期处理测试代码:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
唯一与原始发布代码不同的是额外的 .Catch(Observable.Never<FooData>())
。如所写,此代码将永远 运行。我希望你有一些方法来终止发布内容之外的可观察对象。
阐述我的评论:
正如 Shlomo 已经在他的回答中展示的那样,您可以如何利用可观察到的连接状态,我猜您想要的是在断开连接发生时再次订阅它。
为此使用 Observable.Defer
return Observable.Defer(() => your final observable)
现在断开连接,如果您想再次订阅,请使用重试
return Observable.Defer(() => your final observable).Retry(3)
但您可能需要延迟重试,线性或指数退避
策略,为此使用 DelaySubscription
return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)
这是最终代码,每秒重试一次:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return Observable.Defer(() => {
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch().DelaySubscription(TimeSpan.FromSeconds(1));
})
.Retry(2)
.Subscribe(observer);
}
需要记住的一些注意事项:
这个 DelaySubscription 也会延迟第一次调用,所以如果有问题,请创建一个计数变量,并且只有当计数 > 0 时才使用 DelaySubscription,否则使用普通的可观察对象。
您不能编写像这样有效执行的 Rx 代码:
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
o.OnError(new Exception("Disconnected"));
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
可观察对象的契约是零个或多个值的流,以错误或完整信号结束。它们无法发出更多值。
现在,我很欣赏此代码可能用于测试目的,但如果您创建一个不可能的流,您最终将编写不可能的代码。
根据 Shlomo 的回答,正确的方法是使用 .Switch()
。如果连接有一些延迟,那么 GetConnectionStateObservable
在连接时应该只 return 一个值。 Shlomo 的回答仍然正确。
下面是以下代码:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetObservable()
.Subscribe(observer);
}
}
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
}
public class ClientConsumingProgram
{
class FooObserver : IObserver<FooData>
{
public void OnNext(FooData value)
{
//Client consuming without interruption
}
//.. on error.. onCompleted
}
public static void Main()
{
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
}
}
我想执行以下操作:
当传输服务断开连接时(从服务器关闭套接字),我希望应用程序重试几次,但 foo 服务首先需要在 _transportService
上调用 Connect
,然后 [=18] =] 已连接,调用 GetObservable.
如果 _transportService
在最大重试之前再次连接,FooObserver
上的 OnNext
会在客户端保持滴答,并且一旦超过最大重试次数,就应该触发错误 OnError。
有人可以为我指出正确的实施方向吗?
更新
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
public interface ITransportService
{
IObservable<ConnectionState> GetConnectionStateObservable();
bool Connect();
IObservable<FooData> GetObservable();
}
public class FooData
{
public int Id { get; set; }
public string Msg { get; set; }
}
public enum ConnectionState
{
Open,
Close
}
public class FooMockTransportService : ITransportService
{
public ConnectionState State { get; set; }
private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
private bool _shouldDisconnect;
public FooMockTransportService()
{
_shouldDisconnect = true;
}
public bool Connect()
{
State = ConnectionState.Open;
_connectionSubject.OnNext(ConnectionState.Open);
return true;
}
public IObservable<ConnectionState> GetConnectionStateObservable()
{
return _connectionSubject.AsObservable();
}
public IObservable<FooData> GetObservable()
{
return Observable.Create<FooData>(
o=>
{
TaskPoolScheduler.Default.Schedule(() =>
{
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
//Simulate disconnection, ony once
if(_shouldDisconnect)
{
_shouldDisconnect = false;
State = ConnectionState.Close;
o.OnError(new Exception("Disconnected"));
_connectionSubject.OnNext(ConnectionState.Close);
}
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
});
return () => { };
});
}
}
public class Program
{
class FooObserver : IObserver<FooData>
{
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(FooData value)
{
Console.WriteLine(value.Id);
}
}
public static void Main()
{
var transportService = new FooMockTransportService();
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
Console.Read();
}
}
代码是兼容的,还包含对 Shlomo 的建议。 当前输出:
1
2
System.Exception: Disconnected
期望的输出,在断开连接时它应该捕获并每 1 秒重试一次以查看它是否已连接:
1
2
1
2
3
4
如果你控制ITransportService
,我会建议添加一个属性:
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}
一旦您能够以可观察的方式获取状态,生成可观察的对象就会变得更加容易:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
如果你不控制ITransportService
,我建议创建一个继承自它的接口,你可以在其中添加类似的属性。
顺便说一句,我建议您放弃 FooObserver
,您几乎不需要塑造自己的观察者。公开 Observable,然后在 Observable 上调用 Subscribe
重载通常可以达到目的。
虽然我无法测试任何这些:重试逻辑应该是什么样子,Connect
的 return 值是什么意思,或者 ConnectionState
class 是,代码无法编译。您应该尝试将问题设计为 mcve.
更新:
下面按预期处理测试代码:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
唯一与原始发布代码不同的是额外的 .Catch(Observable.Never<FooData>())
。如所写,此代码将永远 运行。我希望你有一些方法来终止发布内容之外的可观察对象。
阐述我的评论:
正如 Shlomo 已经在他的回答中展示的那样,您可以如何利用可观察到的连接状态,我猜您想要的是在断开连接发生时再次订阅它。
为此使用 Observable.Defer
return Observable.Defer(() => your final observable)
现在断开连接,如果您想再次订阅,请使用重试
return Observable.Defer(() => your final observable).Retry(3)
但您可能需要延迟重试,线性或指数退避 策略,为此使用 DelaySubscription
return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)
这是最终代码,每秒重试一次:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return Observable.Defer(() => {
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch().DelaySubscription(TimeSpan.FromSeconds(1));
})
.Retry(2)
.Subscribe(observer);
}
需要记住的一些注意事项:
这个 DelaySubscription 也会延迟第一次调用,所以如果有问题,请创建一个计数变量,并且只有当计数 > 0 时才使用 DelaySubscription,否则使用普通的可观察对象。
您不能编写像这样有效执行的 Rx 代码:
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
o.OnError(new Exception("Disconnected"));
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
可观察对象的契约是零个或多个值的流,以错误或完整信号结束。它们无法发出更多值。
现在,我很欣赏此代码可能用于测试目的,但如果您创建一个不可能的流,您最终将编写不可能的代码。
根据 Shlomo 的回答,正确的方法是使用 .Switch()
。如果连接有一些延迟,那么 GetConnectionStateObservable
在连接时应该只 return 一个值。 Shlomo 的回答仍然正确。