替换 Rx.NET 表示状态的主题的方法

Ways to replace Rx.NET Subject which representing state

我目前正在修复以下方法中的错误,该方法在 null 时轮询 stateChecker 条件,直到它变为 true(或由于超时而变为 false):

private static void WaitWithSubject(
   Func<bool> stateChecker,
   TimeSpan timeout,
   TimeSpan stepTime, 
   string errorMessage,
   ILifetimeInfo lifetimeInfo)
{
   (bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
   var waitCancellation = (int)stepTime.TotalMilliseconds;
   using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
   {
      using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
      using (Observable.Timer(TimeSpan.Zero, stepTime).
         Subscribe(it =>
         {
            if (stateChecker())
               stateSubject.OnNext((IsOk: true, Message: string.Empty));
         }))
      {
         using (stateSubject.Subscribe(it => state = it))
         {
            while (state.IsOk == null)
               lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
            if (state.IsOk != true)
               throw new TimeoutException(state.Message);
            stateSubject.OnCompleted();
         }
      }
   }
}

此方法偶尔会在执行方法 OnNext 的代码中的以下位置生成 ObjectDisposedException :

if ( stateChecker() )
    stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );

有没有办法完全避免在这种情况下使用 Subject 而使用 Observable.IntervalObservable.Create 之类的东西?

这是一个与 WaitWithSubject 具有相似行为的方法,但不使用 Subject<T>。它使用 Merge 运算符,以便将两个定时器生成的序列组合成一个序列。它还具有取消支持。

public static void WaitUntilTrueState(
    Func<bool> stateChecker,
    TimeSpan checkInterval,
    TimeSpan timeout,
    string timeoutMessage,
    CancellationToken cancellationToken = default)
{
    Observable
        .Timer(TimeSpan.Zero, checkInterval)
        .Merge(Observable.Timer(timeout).IgnoreElements()
            .Concat(Observable.Throw<long>(new TimeoutException(timeoutMessage))))
        .TakeUntil(_ => stateChecker())
        .RunAsync(cancellationToken)
        .Wait();
}

在我看来,这就是你想要做的事情:

private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
    Observable
        .Amb(
            Observable
                .Timer(timeout)
                .SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
            Observable
                .Timer(TimeSpan.Zero, stepTime)
                .Where(_ => stateChecker())
                .Select(_ => Unit.Default))
        .Take(1)
        .Wait();

这里的关键是 Amb 运算符,它启动两个序列,并且只有第一个序列的 returns 值才能产生值或错误。 Take(1) 确保可观察对象在产生值后立即完成。

如果您有 CancellationToken,您可以在 Wait() 之前添加以下行以取消:

.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))

在与 Theodor 来回交流之后,我想出了这个版本,我认为它可能是我能想到的最干净的版本:

private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
{
    var good =
        Observable
            .Timer(TimeSpan.Zero, stepTime)
            .Where(_ => stateChecker())
            .Take(1);

    var fail =
        Observable
            .Timer(timeout)
            .SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
    
    good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}