替换 Rx.NET 表示状态的主题的方法
Ways to replace Rx.NET Subject which representing state
我目前正在修复以下方法中的错误,该方法在 null
时轮询 stateChecker
条件,直到它变为 true
(或由于超时而变为 false
):
private static void WaitWithSubject(
Func<bool> stateChecker,
TimeSpan timeout,
TimeSpan stepTime,
string errorMessage,
ILifetimeInfo lifetimeInfo)
{
(bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
var waitCancellation = (int)stepTime.TotalMilliseconds;
using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
{
using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
using (Observable.Timer(TimeSpan.Zero, stepTime).
Subscribe(it =>
{
if (stateChecker())
stateSubject.OnNext((IsOk: true, Message: string.Empty));
}))
{
using (stateSubject.Subscribe(it => state = it))
{
while (state.IsOk == null)
lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
if (state.IsOk != true)
throw new TimeoutException(state.Message);
stateSubject.OnCompleted();
}
}
}
}
此方法偶尔会在执行方法 OnNext
的代码中的以下位置生成 ObjectDisposedException
:
if ( stateChecker() )
stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );
有没有办法完全避免在这种情况下使用 Subject 而使用 Observable.Interval
或 Observable.Create
之类的东西?
这是一个与 WaitWithSubject
具有相似行为的方法,但不使用 Subject<T>
。它使用 Merge
运算符,以便将两个定时器生成的序列组合成一个序列。它还具有取消支持。
public static void WaitUntilTrueState(
Func<bool> stateChecker,
TimeSpan checkInterval,
TimeSpan timeout,
string timeoutMessage,
CancellationToken cancellationToken = default)
{
Observable
.Timer(TimeSpan.Zero, checkInterval)
.Merge(Observable.Timer(timeout).IgnoreElements()
.Concat(Observable.Throw<long>(new TimeoutException(timeoutMessage))))
.TakeUntil(_ => stateChecker())
.RunAsync(cancellationToken)
.Wait();
}
在我看来,这就是你想要做的事情:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
Observable
.Amb(
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Select(_ => Unit.Default))
.Take(1)
.Wait();
这里的关键是 Amb
运算符,它启动两个序列,并且只有第一个序列的 returns 值才能产生值或错误。 Take(1)
确保可观察对象在产生值后立即完成。
如果您有 CancellationToken
,您可以在 Wait()
之前添加以下行以取消:
.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))
在与 Theodor 来回交流之后,我想出了这个版本,我认为它可能是我能想到的最干净的版本:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
{
var good =
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Take(1);
var fail =
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}
我目前正在修复以下方法中的错误,该方法在 null
时轮询 stateChecker
条件,直到它变为 true
(或由于超时而变为 false
):
private static void WaitWithSubject(
Func<bool> stateChecker,
TimeSpan timeout,
TimeSpan stepTime,
string errorMessage,
ILifetimeInfo lifetimeInfo)
{
(bool? IsOk, string Message) state = (IsOk: null, Message: string.Empty);
var waitCancellation = (int)stepTime.TotalMilliseconds;
using (var stateSubject = new Subject<(bool? IsOk, string Message)>())
{
using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false, Message: errorMessage))))
using (Observable.Timer(TimeSpan.Zero, stepTime).
Subscribe(it =>
{
if (stateChecker())
stateSubject.OnNext((IsOk: true, Message: string.Empty));
}))
{
using (stateSubject.Subscribe(it => state = it))
{
while (state.IsOk == null)
lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
if (state.IsOk != true)
throw new TimeoutException(state.Message);
stateSubject.OnCompleted();
}
}
}
}
此方法偶尔会在执行方法 OnNext
的代码中的以下位置生成 ObjectDisposedException
:
if ( stateChecker() )
stateSubject.OnNext( ( IsOk: true, Message: string.Empty ) );
有没有办法完全避免在这种情况下使用 Subject 而使用 Observable.Interval
或 Observable.Create
之类的东西?
这是一个与 WaitWithSubject
具有相似行为的方法,但不使用 Subject<T>
。它使用 Merge
运算符,以便将两个定时器生成的序列组合成一个序列。它还具有取消支持。
public static void WaitUntilTrueState(
Func<bool> stateChecker,
TimeSpan checkInterval,
TimeSpan timeout,
string timeoutMessage,
CancellationToken cancellationToken = default)
{
Observable
.Timer(TimeSpan.Zero, checkInterval)
.Merge(Observable.Timer(timeout).IgnoreElements()
.Concat(Observable.Throw<long>(new TimeoutException(timeoutMessage))))
.TakeUntil(_ => stateChecker())
.RunAsync(cancellationToken)
.Wait();
}
在我看来,这就是你想要做的事情:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo) =>
Observable
.Amb(
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Select(_ => Unit.Default))
.Take(1)
.Wait();
这里的关键是 Amb
运算符,它启动两个序列,并且只有第一个序列的 returns 值才能产生值或错误。 Take(1)
确保可观察对象在产生值后立即完成。
如果您有 CancellationToken
,您可以在 Wait()
之前添加以下行以取消:
.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))
在与 Theodor 来回交流之后,我想出了这个版本,我认为它可能是我能想到的最干净的版本:
private static void WaitWithSubject(Func<bool> stateChecker, TimeSpan timeout, TimeSpan stepTime, string errorMessage, ILifetimeInfo lifetimeInfo)
{
var good =
Observable
.Timer(TimeSpan.Zero, stepTime)
.Where(_ => stateChecker())
.Take(1);
var fail =
Observable
.Timer(timeout)
.SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}