在前一个元素匹配条件后获取流的第一个元素

Take first elements of stream after previous element matches condition

我是响应式扩展 (rx) 的新手,并尝试在 .NET 中执行以下操作(但在 JS 和其他语言中应该相同。):

我有一个传入的流,其中包含一个字符串和一个布尔值 属性。流将是无限的。我有以下条件:

示例:

("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)

预期结果:

"one"--"three"--"six"--"seven"--"eight"--"ten"

请注意 "six" 和 "seven" 已被打印,因为它们跟随 属性 设置为 true 的对象,即使它们自己的 属性 也设置为"true".

用于测试的简单 .NET 程序:

using System;
using System.Reactive.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        class Result
        {
            public bool Flag { get; set; }
            public string Text { get; set; }
        }

        static void Main(string[] args)
        {               
            var source =
               Observable.Create<Result>(f =>
               {
                   f.OnNext(new Result() { Text = "one", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "two", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "three", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "four", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "five", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "six", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "seven", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "eight", Flag = false });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "nine", Flag = true });
                   Thread.Sleep(1000);
                   f.OnNext(new Result() { Text = "ten", Flag = false });

                   return () => Console.WriteLine("Observer has unsubscribed");
               });
        }
    }
}

我尝试使用 .Scan.Buffer 扩展,但我不知道如何在我的场景中使用它们。

性能当然应该尽可能好,因为最后流将是无限的。

这是我用 TypeScript 编写代码的方式

const printItem = (print: boolean) => {
    return (data) => {
        if (print) {
            console.log(data);
        }
    };
}

let printItemFunction = printItem(true);

from(data)
.pipe(
    tap(item => printItemFunction(item.data)),
    tap(item => printItemFunction = printItem(item.printBool))
)
.subscribe()

基本思想是使用更高级别的函数,printItem,returns 一个知道是否打印以及打印什么的函数。返回的函数存储在一个变量中,printItemFunction.

对于源 Observable 发出的每个项目,要做的第一件事是执行 printItemFunction 传递源 Observable 通知的数据。

第二件事是计算 printItem 函数并将结果存储在变量 printItemFunction 中,以便为以下通知做好准备。

在程序开始时,printItemFunction被初始化为true,因此总是打印第一项。

我对C#不熟悉,给你一个.NET的答案

感谢@Picci 的回答,我找到了一种方法:

Func<bool, Action<Result>> printItem = print =>
                {
                    return data => {
                        if(print)
                        {
                            Console.WriteLine(data.Text);
                        }
                    };
                };

var printItemFunction = printItem(true);

source.Do(item => printItemFunction(item))
      .Do(item => printItemFunction = printItem(item.Flag))
      .Subscribe();

但是,我不太确定这是否是最好的方法,因为不使用 Subscribe() 而是使用副作用对我来说似乎有点奇怪。最后我不仅想打印这些值,还想用它调用 Webservice。

试试这个方法:

var results = new[]
{
    new Result() { Text = "one", Flag = false },
    new Result() { Text = "two", Flag = true },
    new Result() { Text = "three", Flag = false },
    new Result() { Text = "four", Flag = false },
    new Result() { Text = "five", Flag = true },
    new Result() { Text = "six", Flag = true },
    new Result() { Text = "seven", Flag = true },
    new Result() { Text = "eight", Flag = false },
    new Result() { Text = "nine", Flag = true },
    new Result() { Text = "ten", Flag = false },
};

IObservable<Result> source =
    Observable
        .Generate(
            0, x => x < results.Length, x => x + 1,
            x => results[x],
            x => TimeSpan.FromSeconds(1.0));

以上只是以比您的 Observable.Create<Result> 方法更惯用的方式生成 source

现在是 query:

IObservable<Result> query =
    source
        .StartWith(new Result() { Flag = true })
        .Publish(ss =>
            ss
                .Skip(1)
                .Zip(ss, (s1, s0) =>
                    s0.Flag
                    ? Observable.Return(s1) 
                    : Observable.Empty<Result>())
                .Merge());

这里使用.Publish允许源observable只有一个订阅,但是在.Publish方法中可以多次使用。然后可以使用标准 Skip(1).Zip 方法来检查正在生成的后续值。

这是输出:


在 Shlomo 的启发下,这是我使用 .Buffer(2, 1) 的方法:

IObservable<Result> query2 =
    source
        .StartWith(new Result() { Flag = true })
        .Buffer(2, 1)
        .Where(rs => rs.First().Flag)
        .SelectMany(rs => rs.Skip(1));

这里有多种方法可以做到:

var result1 = source.Publish(_source => _source
    .Zip(_source.Skip(1), (older, newer) => (older, newer))
    .Where(t => t.older.Flag == true)
    .Select(t => t.newer)
    .Merge(_source.Take(1))
    .Select(r => r.Text)
);

var result2 = source.Publish(_source => _source
    .Buffer(2, 1)
    .Where(l => l[0].Flag == true)
    .Select(l => l[1])
    .Merge(_source.Take(1))
    .Select(l => l.Text)
);

var result3 = source.Publish(_source => _source
    .Window(2, 1)
    .SelectMany(w => w
        .TakeWhile((r, index) => (index == 0 && r.Flag) || index == 1)
        .Skip(1)
    )
    .Merge(_source.Take(1))
    .Select(l => l.Text)
);

var result4 = source
    .Scan((result: new Result {Flag = true, Text = null}, emit: false), (state, r) => (r, state.result.Flag))
    .Where(t => t.emit)
    .Select(t => t.result.Text);

我偏爱扫描版,但实际上,取决于您。