可观察定时器处理

Observable timers disposing

我正在使用 Reactive .NET 扩展,我想知道它的处理方式。我知道在某些情况下,这样处理它很好:.TakeUntil(Observable.Timer(TimeSpan.FromMinutes(x)))。我

第一个案例

在这种情况下,我有一个计时器,它会在 x 秒后触发,然后完成并应该被处理掉。

public void ScheduleOrderCancellationIfNotFilled(string pair, long orderId, int waitSecondsBeforeCancel)
{
    Observable.Timer(TimeSpan.FromSeconds(waitSecondsBeforeCancel))
        .Do(e =>
        {
            var result = _client.Spot.Order.GetOrder(pair, orderId);

            if (result.Success)
            {
                if (result.Data?.Status != OrderStatus.Filled)
                {
                    _client.Spot.Order.CancelOrder(pair, orderId);
                }
            }
        })
        .Subscribe();
}

第二种情况

在这种情况下,计时器在第一秒运行,然后每 29 分钟重复一次。这应该一直存在,直到它的定义 class 被释放。我相信这个应该通过 IDisposable 实现来处理。怎么样?

var keepAliveListenKey = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromMinutes(29))
    .Do(async e =>
    {
        await KeepAliveListenKeyAsync().ConfigureAwait(false);
    })
    .Subscribe();

编辑

我还希望它使用 Subject<T>,这样可以更轻松地处理和重置订阅。

例如。 (@Enigmativity)

public class UploadDicomSet : ImportBaseSet
{
    IDisposable subscription;
    Subject<IObservable<long>> subject = new Subject<IObservable<long>>();

    public UploadDicomSet()
    {
        subscription = subject.Switch().Subscribe(s => CheckUploadSetList(s));
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
    }

    void CheckUploadSetList(long interval)
    {
        subject.OnNext(Observable.Never<long>());
        // Do other things
    }

    public void AddDicomFile(SharedLib.DicomFile dicomFile)
    {
        subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
        // Reset the subscription to go off in 2 minutes from now
        // Do other things
    }
}

Reactive Observable Subscription Disposal(@Enigmativity)

The disposable returned by the Subscribe extension methods is returned solely to allow you to manually unsubscribe from the observable before the observable naturally ends.

If the observable completes - with either OnCompleted or OnError - then the subscription is already disposed for you.

One important thing to note: the garbage collector never calls .Dispose() on observable subscriptions, so you must dispose of your subscriptions if they have not (or may not have) naturally ended before your subscription goes out of scope.

第一个案例

看来我不需要手动 .Dispose() 第一种情况下的订阅,因为它自然结束。

最后触发 Dispose。

var xs = Observable.Create<long>(o =>
{
    var d = Observable.Timer(TimeSpan.FromSeconds(5))
        .Do(e =>
        {
            Console.WriteLine("5 seconds elapsed.");
        })
        .Subscribe(o);

    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

第二种情况

但在第二种情况下,它不会“自然”结束,我应该处理掉它。

除非手动处理,否则不会触发处理。

var xs = Observable.Create<long>(o =>
{
    var d = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
        .Do(e =>
        {
            Console.WriteLine("Test.");
        })
        .Subscribe(o);

    return Disposable.Create(() =>
    {
        Console.WriteLine("Disposed!");
        d.Dispose();
    });
});

var subscription = xs.Subscribe(x => Console.WriteLine(x));

结论

他给出了很好的例子,如果你也在问自己同样的问题,那值得一看。

第一种情况会自动处理掉。实际上,这是一种实现自动订阅管理的常用方法,而且这绝对是处理 rx.

的一种很好的优雅方式。

在第二种情况下,你设计过度了。 Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)) 本身足以随时间生成一系列上升的 long。由于此流本质上是无穷无尽的,您是对的 - 需要明确的订阅管理。所以有就够了:

var sub = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Subscribe()

...sub.Dispose() 稍后。

P.S。请注意,在您的代码中,您 .Do async/await。很可能这不是您想要的。您希望 SelectMany 确保正确等待 async 操作并处理异常。


在评论区回答您的问题:

What about disposing using Subject instead?

嗯,没什么特别的。 IObserver<>IObservable<> 都是由此 class 实现的,因此它类似于 classical .NET 事件(在某些事件上调用的回调列表)。对于您的问题和用例,它在任何意义上都没有区别。

May you give an example about the .Do with exception handling?

当然可以。这个想法是你想将你的 async/await 封装成一些 Task<T>IObservable<T> 这样可以保留取消和错误信号。为此,必须使用 .SelectMany 方法(就像 LINQ 中的 SelectMany,相同的想法)。所以只需将 .Do 更改为 .SelectMany

Observable
    .Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1))
    .SelectMany(_ => Observable.FromAsync(() => /* that's the point where your Task<> becomes Observable */ myTask))

I'm confused again. Do I need IObservable<IObservable> (Select) or IObservable (SelectMany)

很可能,您不需要开关。为什么?因为它的创建主要是为了避免 IO 竞争条件,所以每当发出新事件时,当前事件(可能由于自然并行或异步工作流而正在进行)保证被取消(即取消订阅)。否则竞争条件会(并且将会)损害你的状态。

相反,SelectMany 将确保所有这些都按顺序发生,以某种总顺序它们确实到达了。什么都不会被取消。您将完成(等待,如果您愿意)当前回调,然后触发下一个回调。当然,这种行为可以通过适当的IScheduler来改变,但那是另一回事了。