如何根据涉及第一个元素的谓词转换可观察对象

How to transform observable based on predicate involving first element

我正在尝试创建一个 Rx.NET 运算符,它接受 Observable<string> 和:

例如:

-a-b-c-d-|- --> -a-b-c-d-|-

-b-c-d-|- --> -|-

我该怎么做?

这是一种方法:

/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
    T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
    comparer ??= EqualityComparer<T>.Default;
    return source.Publish(published =>
        published
            .Where(x => !comparer.Equals(x, expectedFirstElement))
            .Take(1)
            .IgnoreElements()
            .Amb(published)
    );
}

此实现使用 Amb 运算符(“ambiguous”的缩写),它采用两个序列并传播最先做出反应的序列。

  1. 如果第一个元素有想要的值,第一个序列(published.Where+Take+IgnoreElements)不反应,所以传播第二个序列(published,这是整个序列)。此时第一个序列已取消订阅,因此不会为后续元素调用 comparer.Equals 方法。
  2. 如果第一个元素没有所需的值,第一个序列发出完成通知,由 Amb 运算符传播,第二个序列(整个序列)被忽略。

用法示例:

IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");

注意:此实现基于这样的假设:当两个序列同时反应时,Amb 运算符始终选择第一个序列。文档中没有提到这一点,它只说明 Amb 运算符使用并行处理来检测哪个序列产生第一项”source code 非常复杂,所以我不能通过阅读来推导出这个保证。如果你想要更可靠的东西,你可以试试这个实现:

return Observable.Create<T>(observer =>
{
    bool first = true;
    return source.Subscribe(item =>
    {
        if (first)
        {
            first = false;
            if (!comparer.Equals(item, expectedFirstElement))
            {
                observer.OnCompleted(); return;
            }
        }
        observer.OnNext(item);
    }, observer.OnError, observer.OnCompleted);
});

这是一个绝对没有竞争条件的版本:

public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
    source
        .Publish(published =>
            from x in published.Take(1)
            from y in
                x.Equals(expectedFirstElement)
                ? published.StartWith(x)
                : Observable.Empty<T>()
            select y);

方法语法版本:

public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
    source
        .Publish(published =>
            published
                .Take(1)
                .SelectMany(x =>
                    x.Equals(expectedFirstElement)
                    ? published.StartWith(x)
                    : Observable.Empty<T>()));

我更喜欢查询语法,但是嘿...