如何传播一个可观察的序列,但前提是它以特定的值集合开始?

How to propagate an observable sequence, but only if it starts with a specific collection of values?

这是一个“纯属娱乐”的问题,由 jackdry 扩展和概括了 。这里的问题是如何实现一个接受 IObservable<T> 和:

的 Rx 运算符
  1. 如果序列的第一个元素与给定集合中的元素相等且顺序相同,则转发每个元素不变 (ICollection<T>)。
  2. 否则发出空序列。

例如给定值集合 [a, b, c]:

Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|

请求的操作员签名:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default);

给你:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default) =>
source
    .Publish(published =>
        from xs in published.Take(expectedFirstElements.Count).ToArray()
        from y in
            xs.SequenceEqual(expectedFirstElements, comparer)
            ? xs.ToObservable(Scheduler.Immediate).Concat(published)
            : Observable.Empty<T>()
        select y);

我试图通过尽早失败来提高效率,但每次尝试都会降低效率。

这是我的测试代码:

new[] { 1, 2, 3, 4 }
    .ToObservable()
    .IfFirstElements(new[] { 1, 2, 3 })
    .Dump();

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(7)
    .Select(x => x + 1)
    .IfFirstElements(new long[] { 1, 2, 3 })
    .Dump();

new[] { 2, 2, 3, 4 }
    .ToObservable()
    .IfFirstElements(new[] { 1, 2, 3 })
    .Dump();

LINQPad 需要 运行 上面的代码才能得到这个输出:


让它提前结束有点困难,但这里是:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default) =>
        source
            .Publish(published =>
                from xs in
                    published
                        .Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
                        .TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
                        .Take(expectedFirstElements.Count)
                        .LastAsync()
                from y in
                    xs.SequenceEqual(expectedFirstElements, comparer)
                    ? xs.ToObservable(Scheduler.Immediate).Concat(published)
                    : Observable.Empty<T>()
                select y);

这是另一种方法。这个类似于 Enigmativity 的 ,并且使用几乎相同的运算符。不同之处在于,当 source 序列发出一个元素时,仅检查此元素是否与 expectedFirstElements 集合的相应元素相等,使其成为 O(n) 算法。

/// <summary>
/// If the first elements have the expected values, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElements<T>(this IObservable<T> source,
    ICollection<T> expectedFirstElements, IEqualityComparer<T> comparer = default)
{
    comparer ??= EqualityComparer<T>.Default;
    return source.Publish(published => published
        .Zip(expectedFirstElements)
        .TakeWhile(e => comparer.Equals(e.First, e.Second))
        .Take(expectedFirstElements.Count)
        .Select(e => e.First)
        .ToList()
        .SelectMany(list => list.Count == expectedFirstElements.Count ?
            published.StartWith(list) : Observable.Empty<T>()));
}

附带说明一下,我的初衷是将 IEnumerable<T> 作为预期值的容器。我找不到一个解决方案,尽管它并没有急切地实现可枚举,所以我作弊并将问题改编为我手头的解决方案。因此容器的类型是 ICollection<T>.