Rx.Net 忽略某些值直到它们在一定时间内相同的运算符

Rx.Net Operator that ignores some values until they are identical during a certain amount of time

我正在尝试创建一个具有以下行为的 Rx.Net 运算符:

我想要像下面这样的大理石。

当消息类型为A或B时,我们直接发送。当消息是 C 时,我们要确保它不仅仅是一个传递状态,并且只有在一定时间内是这样的情况下才发送它。这是通过我们收到的第一个 C 和当前 C 之间的时间来衡量的。在特定的时间后,所有 C 都被“接受”,我们将它们作为正常的通过。

这是当我们收到 C 时的样子,它只是可传递的,我们想忽略它。

我曾尝试使用 Scan 运算符做一些事情,当我有特定值时使用 returning previous/current 值,但感觉真的很糟糕。

这是我编写的一些代码,用于演示我的尝试。在那种情况下,“特殊类型”就是当值为 999 时,但在运算符中,我想做的可能是另一个测试,甚至是传递给我的运算符的函数。

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(value => (int)value);

var myObservable = intObservable.Take(4).Concat(oneObservable.Take(3)).Timestamp().Repeat();

var test = myObservable.Scan(
    (previous: default(Timestamped<int>), current: default(Timestamped<int>)),
    (accumulated, current) =>
    {
        if (current.Value == 999)
        {
            if (accumulated.previous.Value != 999)
            {
                return (accumulated.current, current);
            }
            
            return (accumulated.previous, current);
        }
        else if(accumulated.current.Value == 999){
            return (accumulated.previous, current);
        }

        return (accumulated.current, current);
    })
    .Where(
        value => value.current.Value != 999 
        || (value.previous.Value == 999 && value.current.Timestamp - value.previous.Timestamp > TimeSpan.FromSeconds(1.5)))
    .Select(value => value.current);

这是自定义 IgnoreNonEstablishedContiguousValue 运算符的简单实现,具有理想的功能:

/// <summary>
/// Ignores elements having a specific value, until this value has
/// been repeated contiguously for a specific duration.
/// </summary>
public static IObservable<T> IgnoreNonEstablishedContiguousValue<T>(
    this IObservable<T> source,
    T value,
    TimeSpan dueTimeUntilEstablished,
    IEqualityComparer<T> comparer = default,
    IScheduler scheduler = default)
{
    // Arguments validation omitted
    comparer ??= EqualityComparer<T>.Default;
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        IStopwatch stopwatch = null;
        return source.Do(item =>
        {
            if (comparer.Equals(item, value))
                stopwatch ??= scheduler.StartStopwatch();
            else
                stopwatch = null;
        })
        .Where(_ => stopwatch == null || stopwatch.Elapsed >= dueTimeUntilEstablished);
    });
}

此实现基于 DoWhere 运算符。我不太喜欢使用 Scan 运算符作为构建块,因为它会导致代码冗长且可读性较差恕我直言。 Observable.Defer 包装器的目的是隔离每个订阅的状态。

用法示例:

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => (int)v);

IObservable<int> myObservable = intObservable.Take(4).Concat(oneObservable.Take(3))
    .Repeat()
    .IgnoreNonEstablishedContiguousValue(999, TimeSpan.FromSeconds(1.5));

这是我想出的:

public static IObservable<T> FilterSpecials<T>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    TimeSpan timeUntilEstablished,
    IScheduler scheduler = default)
{
    return source.FilterSpecials(specialDetector, Observable.Timer(timeUntilEstablished), scheduler);
}

public static IObservable<T> FilterSpecials<T, U>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    IObservable<U> observeSpecialsUntilEstablished,
    IScheduler scheduler = default)
{
    scheduler = scheduler ?? Scheduler.Default;

    return source
        .Select(i => (value: i, isSpecial: specialDetector(i)))
        .StartWith((value: default(T), isSpecial: false))
        .Publish(_source => _source
            .Zip(_source.Skip(1))
            .Select((t, index) => (
                newValue: t.Second.value,
                isNewValueSpecial: t.Second.isSpecial,
                isPreviousValueSpecial: t.First.isSpecial,
                isFirstElement: index == 0)
            )
            .SelectMany((t, index) => t.isNewValueSpecial
                ? (t.isFirstElement || !t.isPreviousValueSpecial)       
                    ? _source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)   
                    : Observable.Empty<T>()
                : Observable.Return(t.newValue)
        ));
}

核心问题是,对于第一个特殊值,您想暂时停止收听常规 source 可观察对象,并切换到如下所示的内容:

source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)

当你有那种特殊的可观察听力时,你可以忽略你的常规听力并发出 Observable.Empty<T>()。当您没有特殊值时,您实际上是在执行 source.SelectMany(i => Observable.Return(i)),这是对 return source.

的空操作

其他都是 window-dressing:ZipPublishStartWith 可以很容易地与以前的值进行比较。如果您愿意,可以将其抽象掉。将所有内容都放入那个命名的元组中是为了帮助进行自我记录,并防止重新调用 specialDetector,以防它是一项昂贵的操作。