Rx.net 在 observable 的 disconnect/error 上实现重试功能

Rx.net implement retry functionality on disconnect/error in observable

下面是以下代码:

    public class FooService
    {
        private ITransportService _transportService;
        public FooService(ITransportService transportService)
        {
            _transportService = transportService;
            _transportService.Connect();
        }

        public IDisposable Subscribe(IObserver<FooData> observer)
        {
            return _transportService.GetObservable()
                .Subscribe(observer);
        }
    }

    public interface ITransportService
    {
        ConnectionState State { get; }
        bool Connect();
        IObservable<FooData> GetObservable();
    }

    public class ClientConsumingProgram
    {
        class FooObserver : IObserver<FooData>
        {
            public void OnNext(FooData value)
            {
                //Client consuming without interruption
            }
            //.. on error.. onCompleted
        }
        public static void Main()
        {
            var fooService = new FooService(transportService);
            var fooObserver = new FooObserver();
            var disposable = fooService.Subscribe(fooObserver);
        }
    }

我想执行以下操作:

当传输服务断开连接时(从服务器关闭套接字),我希望应用程序重试几次,但 foo 服务首先需要在 _transportService 上调用 Connect,然后 [=18] =] 已连接,调用 GetObservable.

如果 _transportService 在最大重试之前再次连接,FooObserver 上的 OnNext 会在客户端保持滴答,并且一旦超过最大重试次数,就应该触发错误 OnError。

有人可以为我指出正确的实施方向吗?


更新

public class FooService
{
    private ITransportService _transportService;
    public FooService(ITransportService transportService)
    {
        _transportService = transportService;
        _transportService.Connect();
    }

    public IDisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
        .Select(cs => cs == ConnectionState.Open)
        .DistinctUntilChanged()
        .Select(isOpen => isOpen
            ? _transportService.GetObservable()   //if open, return observable
            : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                .IgnoreElements()
                .Select(_ => default(FooData))
                .Concat(Observable.Never<FooData>())
        )
        .Switch()
        .Subscribe(observer);
    }
}

public interface ITransportService
{
    IObservable<ConnectionState> GetConnectionStateObservable();
    bool Connect();
    IObservable<FooData> GetObservable();
}

public class FooData
{
    public int Id { get; set; }
    public string Msg { get; set; }
}

public enum ConnectionState
{
    Open,
    Close
}

public class FooMockTransportService : ITransportService
{
    public ConnectionState State { get; set; }
    private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
    private bool _shouldDisconnect;

    public FooMockTransportService()
    {
        _shouldDisconnect = true;
    }

    public bool Connect()
    {
        State = ConnectionState.Open;
        _connectionSubject.OnNext(ConnectionState.Open);
        return true;
    }

    public IObservable<ConnectionState> GetConnectionStateObservable()
    {
        return _connectionSubject.AsObservable();
    }

    public IObservable<FooData> GetObservable()
    {
        return Observable.Create<FooData>(
            o=>
            {
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    o.OnNext(new FooData { Id = 1, Msg = "First" });
                    o.OnNext(new FooData { Id = 2, Msg = "Sec" });

                    //Simulate disconnection, ony once
                    if(_shouldDisconnect)
                    {
                        _shouldDisconnect = false;
                        State = ConnectionState.Close;
                        o.OnError(new Exception("Disconnected"));
                        _connectionSubject.OnNext(ConnectionState.Close);
                    }

                    o.OnNext(new FooData { Id = 3, Msg = "Third" });
                    o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
                });
                return () => { };
            });
    }
}

public class Program
{
    class FooObserver : IObserver<FooData>
    {
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public void OnError(Exception error)
        {
            Console.WriteLine(error);
        }

        public void OnNext(FooData value)
        {
            Console.WriteLine(value.Id);
        }
    }
    public static void Main()
    {
        var transportService = new FooMockTransportService();
        var fooService = new FooService(transportService);
        var fooObserver = new FooObserver();
        var disposable = fooService.Subscribe(fooObserver);
        Console.Read();
    }
}

代码是兼容的,还包含对 Shlomo 的建议。 当前输出:

1
2
System.Exception: Disconnected

期望的输出,在断开连接时它应该捕获并每 1 秒重试一次以查看它是否已连接:

1
2
1
2
3
4

如果你控制ITransportService,我会建议添加一个属性:

public interface ITransportService
{
    ConnectionState State { get; }
    bool Connect();
    IObservable<FooData> GetObservable();
    IObservable<ConnectionState> GetConnectionStateObservable();
}

一旦您能够以可观察的方式获取状态,生成可观察的对象就会变得更加容易:

public class FooService
{
    private ITransportService _transportService;
    public FooService(ITransportService transportService)
    {
        _transportService = transportService;
        _transportService.Connect();
    }

    public IDisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
            .Select(cs => cs == ConnectionState.Open)
            .DistinctUntilChanged()
            .Select(isOpen => isOpen 
                ? _transportService.GetObservable()   //if open, return observable
                : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                    .IgnoreElements()
                    .Select(_ => default(FooData))
                    .Concat(Observable.Never<FooData>())
            )
            .Switch()
            .Subscribe(observer);
    }
}

如果你不控制ITransportService,我建议创建一个继承自它的接口,你可以在其中添加类似的属性。

顺便说一句,我建议您放弃 FooObserver,您几乎不需要塑造自己的观察者。公开 Observable,然后在 Observable 上调用 Subscribe 重载通常可以达到目的。

虽然我无法测试任何这些:重试逻辑应该是什么样子,Connect 的 return 值是什么意思,或者 ConnectionState class 是,代码无法编译。您应该尝试将问题设计为 mcve.


更新:

下面按预期处理测试代码:

public IDisposable Subscribe(IObserver<FooData> observer)
{
    return _transportService.GetConnectionStateObservable()
        .Select(cs => cs == ConnectionState.Open)
        .DistinctUntilChanged()
        .Select(isOpen => isOpen
            ? _transportService.GetObservable()   //if open, return observable
                .Catch(Observable.Never<FooData>())
            : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                .IgnoreElements()
                .Select(_ => default(FooData))
                .Concat(Observable.Never<FooData>())
        )
        .Switch()
        .Subscribe(observer);
}

唯一与原始发布代码不同的是额外的 .Catch(Observable.Never<FooData>())。如所写,此代码将永远 运行。我希望你有一些方法来终止发布内容之外的可观察对象。

阐述我的评论:

正如 Shlomo 已经在他的回答中展示的那样,您可以如何利用可观察到的连接状态,我猜您想要的是在断开连接发生时再次订阅它。

为此使用 Observable.Defer

return Observable.Defer(() => your final observable)

现在断开连接,如果您想再次订阅,请使用重试

return Observable.Defer(() => your final observable).Retry(3)

但您可能需要延迟重试,线性或指数退避 策略,为此使用 DelaySubscription

return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)

这是最终代码,每秒重试一次:

        public IDisposable Subscribe(IObserver<FooData> observer)
        {
            return Observable.Defer(() => {
                return _transportService.GetConnectionStateObservable()
            .Select(cs => cs == ConnectionState.Open)
            .DistinctUntilChanged()
            .Select(isOpen => isOpen
                ? _transportService.GetObservable()   //if open, return observable
                : Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
                    .IgnoreElements()
                    .Select(_ => default(FooData))
                    .Concat(Observable.Never<FooData>())
            )
            .Switch().DelaySubscription(TimeSpan.FromSeconds(1));
            })
            .Retry(2)
            .Subscribe(observer);
        }

需要记住的一些注意事项:

这个 DelaySubscription 也会延迟第一次调用,所以如果有问题,请创建一个计数变量,并且只有当计数 > 0 时才使用 DelaySubscription,否则使用普通的可观察对象。

您不能编写像这样有效执行的 Rx 代码:

o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });

o.OnError(new Exception("Disconnected"));

o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });

可观察对象的契约是零个或多个值的流,以错误或完整信号结束。它们无法发出更多值。

现在,我很欣赏此代码可能用于测试目的,但如果您创建一个不可能的流,您最终将编写不可能的代码。

根据 Shlomo 的回答,正确的方法是使用 .Switch()。如果连接有一些延迟,那么 GetConnectionStateObservable 在连接时应该只 return 一个值。 Shlomo 的回答仍然正确。