Rx.Net 忽略某些值直到它们在一定时间内相同的运算符
Rx.Net Operator that ignores some values until they are identical during a certain amount of time
我正在尝试创建一个具有以下行为的 Rx.Net 运算符:
- 当一个事件是“正常类型”时,return直接该事件
- 当事件属于“特殊类型”时,等待您在一定时间内收到重复的此类事件
我想要像下面这样的大理石。
当消息类型为A或B时,我们直接发送。当消息是 C 时,我们要确保它不仅仅是一个传递状态,并且只有在一定时间内是这样的情况下才发送它。这是通过我们收到的第一个 C
和当前 C
之间的时间来衡量的。在特定的时间后,所有 C
都被“接受”,我们将它们作为正常的通过。
这是当我们收到 C
时的样子,它只是可传递的,我们想忽略它。
我曾尝试使用 Scan
运算符做一些事情,当我有特定值时使用 returning previous/current 值,但感觉真的很糟糕。
这是我编写的一些代码,用于演示我的尝试。在那种情况下,“特殊类型”就是当值为 999 时,但在运算符中,我想做的可能是另一个测试,甚至是传递给我的运算符的函数。
var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(value => (int)value);
var myObservable = intObservable.Take(4).Concat(oneObservable.Take(3)).Timestamp().Repeat();
var test = myObservable.Scan(
(previous: default(Timestamped<int>), current: default(Timestamped<int>)),
(accumulated, current) =>
{
if (current.Value == 999)
{
if (accumulated.previous.Value != 999)
{
return (accumulated.current, current);
}
return (accumulated.previous, current);
}
else if(accumulated.current.Value == 999){
return (accumulated.previous, current);
}
return (accumulated.current, current);
})
.Where(
value => value.current.Value != 999
|| (value.previous.Value == 999 && value.current.Timestamp - value.previous.Timestamp > TimeSpan.FromSeconds(1.5)))
.Select(value => value.current);
这是自定义 IgnoreNonEstablishedContiguousValue
运算符的简单实现,具有理想的功能:
/// <summary>
/// Ignores elements having a specific value, until this value has
/// been repeated contiguously for a specific duration.
/// </summary>
public static IObservable<T> IgnoreNonEstablishedContiguousValue<T>(
this IObservable<T> source,
T value,
TimeSpan dueTimeUntilEstablished,
IEqualityComparer<T> comparer = default,
IScheduler scheduler = default)
{
// Arguments validation omitted
comparer ??= EqualityComparer<T>.Default;
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
IStopwatch stopwatch = null;
return source.Do(item =>
{
if (comparer.Equals(item, value))
stopwatch ??= scheduler.StartStopwatch();
else
stopwatch = null;
})
.Where(_ => stopwatch == null || stopwatch.Elapsed >= dueTimeUntilEstablished);
});
}
此实现基于 Do
和 Where
运算符。我不太喜欢使用 Scan
运算符作为构建块,因为它会导致代码冗长且可读性较差恕我直言。 Observable.Defer
包装器的目的是隔离每个订阅的状态。
用法示例:
var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => (int)v);
IObservable<int> myObservable = intObservable.Take(4).Concat(oneObservable.Take(3))
.Repeat()
.IgnoreNonEstablishedContiguousValue(999, TimeSpan.FromSeconds(1.5));
这是我想出的:
public static IObservable<T> FilterSpecials<T>(this IObservable<T> source,
Func<T, bool> specialDetector,
TimeSpan timeUntilEstablished,
IScheduler scheduler = default)
{
return source.FilterSpecials(specialDetector, Observable.Timer(timeUntilEstablished), scheduler);
}
public static IObservable<T> FilterSpecials<T, U>(this IObservable<T> source,
Func<T, bool> specialDetector,
IObservable<U> observeSpecialsUntilEstablished,
IScheduler scheduler = default)
{
scheduler = scheduler ?? Scheduler.Default;
return source
.Select(i => (value: i, isSpecial: specialDetector(i)))
.StartWith((value: default(T), isSpecial: false))
.Publish(_source => _source
.Zip(_source.Skip(1))
.Select((t, index) => (
newValue: t.Second.value,
isNewValueSpecial: t.Second.isSpecial,
isPreviousValueSpecial: t.First.isSpecial,
isFirstElement: index == 0)
)
.SelectMany((t, index) => t.isNewValueSpecial
? (t.isFirstElement || !t.isPreviousValueSpecial)
? _source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)
: Observable.Empty<T>()
: Observable.Return(t.newValue)
));
}
核心问题是,对于第一个特殊值,您想暂时停止收听常规 source
可观察对象,并切换到如下所示的内容:
source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)
当你有那种特殊的可观察听力时,你可以忽略你的常规听力并发出 Observable.Empty<T>()
。当您没有特殊值时,您实际上是在执行 source.SelectMany(i => Observable.Return(i))
,这是对 return source
.
的空操作
其他都是 window-dressing:Zip
、Publish
和 StartWith
可以很容易地与以前的值进行比较。如果您愿意,可以将其抽象掉。将所有内容都放入那个命名的元组中是为了帮助进行自我记录,并防止重新调用 specialDetector
,以防它是一项昂贵的操作。
我正在尝试创建一个具有以下行为的 Rx.Net 运算符:
- 当一个事件是“正常类型”时,return直接该事件
- 当事件属于“特殊类型”时,等待您在一定时间内收到重复的此类事件
我想要像下面这样的大理石。
当消息类型为A或B时,我们直接发送。当消息是 C 时,我们要确保它不仅仅是一个传递状态,并且只有在一定时间内是这样的情况下才发送它。这是通过我们收到的第一个 C
和当前 C
之间的时间来衡量的。在特定的时间后,所有 C
都被“接受”,我们将它们作为正常的通过。
这是当我们收到 C
时的样子,它只是可传递的,我们想忽略它。
我曾尝试使用 Scan
运算符做一些事情,当我有特定值时使用 returning previous/current 值,但感觉真的很糟糕。
这是我编写的一些代码,用于演示我的尝试。在那种情况下,“特殊类型”就是当值为 999 时,但在运算符中,我想做的可能是另一个测试,甚至是传递给我的运算符的函数。
var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(value => (int)value);
var myObservable = intObservable.Take(4).Concat(oneObservable.Take(3)).Timestamp().Repeat();
var test = myObservable.Scan(
(previous: default(Timestamped<int>), current: default(Timestamped<int>)),
(accumulated, current) =>
{
if (current.Value == 999)
{
if (accumulated.previous.Value != 999)
{
return (accumulated.current, current);
}
return (accumulated.previous, current);
}
else if(accumulated.current.Value == 999){
return (accumulated.previous, current);
}
return (accumulated.current, current);
})
.Where(
value => value.current.Value != 999
|| (value.previous.Value == 999 && value.current.Timestamp - value.previous.Timestamp > TimeSpan.FromSeconds(1.5)))
.Select(value => value.current);
这是自定义 IgnoreNonEstablishedContiguousValue
运算符的简单实现,具有理想的功能:
/// <summary>
/// Ignores elements having a specific value, until this value has
/// been repeated contiguously for a specific duration.
/// </summary>
public static IObservable<T> IgnoreNonEstablishedContiguousValue<T>(
this IObservable<T> source,
T value,
TimeSpan dueTimeUntilEstablished,
IEqualityComparer<T> comparer = default,
IScheduler scheduler = default)
{
// Arguments validation omitted
comparer ??= EqualityComparer<T>.Default;
scheduler ??= Scheduler.Default;
return Observable.Defer(() =>
{
IStopwatch stopwatch = null;
return source.Do(item =>
{
if (comparer.Equals(item, value))
stopwatch ??= scheduler.StartStopwatch();
else
stopwatch = null;
})
.Where(_ => stopwatch == null || stopwatch.Elapsed >= dueTimeUntilEstablished);
});
}
此实现基于 Do
和 Where
运算符。我不太喜欢使用 Scan
运算符作为构建块,因为它会导致代码冗长且可读性较差恕我直言。 Observable.Defer
包装器的目的是隔离每个订阅的状态。
用法示例:
var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => (int)v);
IObservable<int> myObservable = intObservable.Take(4).Concat(oneObservable.Take(3))
.Repeat()
.IgnoreNonEstablishedContiguousValue(999, TimeSpan.FromSeconds(1.5));
这是我想出的:
public static IObservable<T> FilterSpecials<T>(this IObservable<T> source,
Func<T, bool> specialDetector,
TimeSpan timeUntilEstablished,
IScheduler scheduler = default)
{
return source.FilterSpecials(specialDetector, Observable.Timer(timeUntilEstablished), scheduler);
}
public static IObservable<T> FilterSpecials<T, U>(this IObservable<T> source,
Func<T, bool> specialDetector,
IObservable<U> observeSpecialsUntilEstablished,
IScheduler scheduler = default)
{
scheduler = scheduler ?? Scheduler.Default;
return source
.Select(i => (value: i, isSpecial: specialDetector(i)))
.StartWith((value: default(T), isSpecial: false))
.Publish(_source => _source
.Zip(_source.Skip(1))
.Select((t, index) => (
newValue: t.Second.value,
isNewValueSpecial: t.Second.isSpecial,
isPreviousValueSpecial: t.First.isSpecial,
isFirstElement: index == 0)
)
.SelectMany((t, index) => t.isNewValueSpecial
? (t.isFirstElement || !t.isPreviousValueSpecial)
? _source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)
: Observable.Empty<T>()
: Observable.Return(t.newValue)
));
}
核心问题是,对于第一个特殊值,您想暂时停止收听常规 source
可观察对象,并切换到如下所示的内容:
source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)
当你有那种特殊的可观察听力时,你可以忽略你的常规听力并发出 Observable.Empty<T>()
。当您没有特殊值时,您实际上是在执行 source.SelectMany(i => Observable.Return(i))
,这是对 return source
.
其他都是 window-dressing:Zip
、Publish
和 StartWith
可以很容易地与以前的值进行比较。如果您愿意,可以将其抽象掉。将所有内容都放入那个命名的元组中是为了帮助进行自我记录,并防止重新调用 specialDetector
,以防它是一项昂贵的操作。