"long" 间隔后完成可观察序列
Complete Observable Sequence After "long" interval
以下可观察序列将每个元素添加到 ReplaySubject,以便我可以稍后访问任何元素,甚至等待 ReplaySubject 完成。它在达到时间跨度后完成 ReaplySubject。
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
我希望序列 运行 直到自上次 "OnNext" 调用以来的给定时间,然后完成。这在各种蓝牙通信场景中非常有用,在这些场景中,蓝牙设备会给我一个数据序列,然后在没有任何类型的完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果自上次蓝牙通知以来已经 "too long",我想完成 ReplaySubject。
我可以通过创建一个计时器,在收到每个元素时重置它,然后在计时器达到 "too long" 时完成 ReplaySubject 来完成此操作,但我听说创建一个对象并从在可观察的订阅中不是线程安全的。
关于如何在 "too long" 间隔后完成序列的任何建议?
据我所知,这是一个线程不安全的版本,但应该可以正常工作:
bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
reading = false;
};
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeWhile(x => reading)
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
timer.Stop();
timer.Start();
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
根据 Simonare 的第一个回答,这似乎令人满意:
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
您可以考虑使用 Timeout Operator 。唯一的缺点是它以错误信号终止。您可能需要处理假错误
The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified span of time.
如果你使用下面的方法你可以超越错误
.Timeout(200, Promise.resolve(42));
Another variant allows you to instruct timeout to switch to a backup Observable that you specify, rather than terminating with an error, if the timeout condition is triggered.
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
....
rfidPlayer.OnNext(....);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
我觉得使用 Timeout
很讨厌,因为有例外。
我更喜欢在序列中注入一个值,我可以用它来终止序列。例如,如果我的序列产生非负数,那么如果我注入一个 -1
我知道结束序列。
这是一个例子:
从这个从 1 开始生成 2 的幂的 observable 开始,它还会将每个值的生成延迟等待值的毫秒数。
Observable
.Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))
所以 1、2、4、8 等,越来越慢。
现在我想停止这个序列,如果在 3.0
秒内没有值,那么我可以这样做:
.Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
.Switch()
.TakeWhile(x => x >= 0)
如果我运行这个序列我得到这个输出:
1
2
4
8
16
32
64
128
256
512
1024
2048
序列即将生成 4096
但它首先等待 4096
毫秒来生成该值 - 同时 Observable.Timer(TimeSpan.FromSeconds(3.0))
触发并输出 -1
因此停止序列。
这个查询的关键部分是Switch
的使用。它采用 IObservable<IObservable<T>>
并通过仅订阅最新的外部可观察对象并取消订阅前一个对象来生成 IObservable<T>
。
因此,在我的查询中,序列产生的每个新值都会停止并重新启动 Timer
。
在您的情况下,您的可观察对象将如下所示:
characteristic
.WhenNotificationReceived()
.Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
.Switch()
.TakeWhile(x => x != null)
.Subscribe(rfidPlayer);
这是您可以使用的自定义运算符 TakeUntilTimeout
,它是内置 Timeout
运算符之上的薄层。
/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
this IObservable<T> source,
TimeSpan timeout)
{
return source.Timeout(timeout, Observable.Empty<T>());
}
以下可观察序列将每个元素添加到 ReplaySubject,以便我可以稍后访问任何元素,甚至等待 ReplaySubject 完成。它在达到时间跨度后完成 ReaplySubject。
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
我希望序列 运行 直到自上次 "OnNext" 调用以来的给定时间,然后完成。这在各种蓝牙通信场景中非常有用,在这些场景中,蓝牙设备会给我一个数据序列,然后在没有任何类型的完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果自上次蓝牙通知以来已经 "too long",我想完成 ReplaySubject。
我可以通过创建一个计时器,在收到每个元素时重置它,然后在计时器达到 "too long" 时完成 ReplaySubject 来完成此操作,但我听说创建一个对象并从在可观察的订阅中不是线程安全的。
关于如何在 "too long" 间隔后完成序列的任何建议?
据我所知,这是一个线程不安全的版本,但应该可以正常工作:
bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
reading = false;
};
ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
.TakeWhile(x => reading)
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
timer.Stop();
timer.Start();
rfidPlayer.OnNext(nextTag);
},
onCompleted: () =>
{
rfidPlayer.OnCompleted();
});
根据 Simonare 的第一个回答,这似乎令人满意:
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
string nextTag = BitConverter.ToString(result.Data);
nextTag = nextTag.Replace("-", "");
rfidPlayer.OnNext(nextTag);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
您可以考虑使用 Timeout Operator 。唯一的缺点是它以错误信号终止。您可能需要处理假错误
The Timeout operator allows you to abort an Observable with an onError termination if that Observable fails to emit any items during a specified span of time.
如果你使用下面的方法你可以超越错误
.Timeout(200, Promise.resolve(42));
Another variant allows you to instruct timeout to switch to a backup Observable that you specify, rather than terminating with an error, if the timeout condition is triggered.
characteristic.WhenNotificationReceived()
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
onNext: result =>
{
....
rfidPlayer.OnNext(....);
},
onError: error =>
{
rfidPlayer.OnCompleted();
});
我觉得使用 Timeout
很讨厌,因为有例外。
我更喜欢在序列中注入一个值,我可以用它来终止序列。例如,如果我的序列产生非负数,那么如果我注入一个 -1
我知道结束序列。
这是一个例子:
从这个从 1 开始生成 2 的幂的 observable 开始,它还会将每个值的生成延迟等待值的毫秒数。
Observable
.Generate(1, x => true, x => 2 * x, x => x, x => TimeSpan.FromMilliseconds(x))
所以 1、2、4、8 等,越来越慢。
现在我想停止这个序列,如果在 3.0
秒内没有值,那么我可以这样做:
.Select(x => Observable.Timer(TimeSpan.FromSeconds(3.0)).Select(y => -1).StartWith(x))
.Switch()
.TakeWhile(x => x >= 0)
如果我运行这个序列我得到这个输出:
1 2 4 8 16 32 64 128 256 512 1024 2048
序列即将生成 4096
但它首先等待 4096
毫秒来生成该值 - 同时 Observable.Timer(TimeSpan.FromSeconds(3.0))
触发并输出 -1
因此停止序列。
这个查询的关键部分是Switch
的使用。它采用 IObservable<IObservable<T>>
并通过仅订阅最新的外部可观察对象并取消订阅前一个对象来生成 IObservable<T>
。
因此,在我的查询中,序列产生的每个新值都会停止并重新启动 Timer
。
在您的情况下,您的可观察对象将如下所示:
characteristic
.WhenNotificationReceived()
.Select(result => BitConverter.ToString(result.Data).Replace("-", ""))
.Select(x => Observable.Timer(TimeSpan.FromSeconds(1.0)).Select(y => (string)null).StartWith(x))
.Switch()
.TakeWhile(x => x != null)
.Subscribe(rfidPlayer);
这是您可以使用的自定义运算符 TakeUntilTimeout
,它是内置 Timeout
运算符之上的薄层。
/// <summary>
/// Applies a timeout policy for each element in the observable sequence.
/// If the next element isn't received within the specified timeout duration
/// starting from its predecessor, the sequence terminates.
/// </summary>
public static IObservable<T> TakeUntilTimeout<T>(
this IObservable<T> source,
TimeSpan timeout)
{
return source.Timeout(timeout, Observable.Empty<T>());
}