在前一个元素匹配条件后获取流的第一个元素
Take first elements of stream after previous element matches condition
我是响应式扩展 (rx) 的新手,并尝试在 .NET 中执行以下操作(但在 JS 和其他语言中应该相同。):
我有一个传入的流,其中包含一个字符串和一个布尔值 属性。流将是无限的。我有以下条件:
- 应该始终打印第一个对象。
- 现在应该跳过所有传入的对象,直到对象到达时 bool 属性 设置为 "true"。
- 当 bool 属性 设置为 "true" 的对象到达时,应跳过该对象但应打印下一个对象(无论属性是什么)。
- 现在以这种方式进行,应该打印 属性 设置为 true 的对象之后的每个对象。
示例:
("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)
预期结果:
"one"--"three"--"six"--"seven"--"eight"--"ten"
请注意 "six" 和 "seven" 已被打印,因为它们跟随 属性 设置为 true 的对象,即使它们自己的 属性 也设置为"true".
用于测试的简单 .NET 程序:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
我尝试使用 .Scan
和 .Buffer
扩展,但我不知道如何在我的场景中使用它们。
性能当然应该尽可能好,因为最后流将是无限的。
这是我用 TypeScript 编写代码的方式
const printItem = (print: boolean) => {
return (data) => {
if (print) {
console.log(data);
}
};
}
let printItemFunction = printItem(true);
from(data)
.pipe(
tap(item => printItemFunction(item.data)),
tap(item => printItemFunction = printItem(item.printBool))
)
.subscribe()
基本思想是使用更高级别的函数,printItem
,returns 一个知道是否打印以及打印什么的函数。返回的函数存储在一个变量中,printItemFunction
.
对于源 Observable 发出的每个项目,要做的第一件事是执行 printItemFunction
传递源 Observable 通知的数据。
第二件事是计算 printItem
函数并将结果存储在变量 printItemFunction
中,以便为以下通知做好准备。
在程序开始时,printItemFunction
被初始化为true
,因此总是打印第一项。
我对C#不熟悉,给你一个.NET的答案
感谢@Picci 的回答,我找到了一种方法:
Func<bool, Action<Result>> printItem = print =>
{
return data => {
if(print)
{
Console.WriteLine(data.Text);
}
};
};
var printItemFunction = printItem(true);
source.Do(item => printItemFunction(item))
.Do(item => printItemFunction = printItem(item.Flag))
.Subscribe();
但是,我不太确定这是否是最好的方法,因为不使用 Subscribe() 而是使用副作用对我来说似乎有点奇怪。最后我不仅想打印这些值,还想用它调用 Webservice。
试试这个方法:
var results = new[]
{
new Result() { Text = "one", Flag = false },
new Result() { Text = "two", Flag = true },
new Result() { Text = "three", Flag = false },
new Result() { Text = "four", Flag = false },
new Result() { Text = "five", Flag = true },
new Result() { Text = "six", Flag = true },
new Result() { Text = "seven", Flag = true },
new Result() { Text = "eight", Flag = false },
new Result() { Text = "nine", Flag = true },
new Result() { Text = "ten", Flag = false },
};
IObservable<Result> source =
Observable
.Generate(
0, x => x < results.Length, x => x + 1,
x => results[x],
x => TimeSpan.FromSeconds(1.0));
以上只是以比您的 Observable.Create<Result>
方法更惯用的方式生成 source
。
现在是 query
:
IObservable<Result> query =
source
.StartWith(new Result() { Flag = true })
.Publish(ss =>
ss
.Skip(1)
.Zip(ss, (s1, s0) =>
s0.Flag
? Observable.Return(s1)
: Observable.Empty<Result>())
.Merge());
这里使用.Publish
允许源observable只有一个订阅,但是在.Publish
方法中可以多次使用。然后可以使用标准 Skip(1).Zip
方法来检查正在生成的后续值。
这是输出:
在 Shlomo 的启发下,这是我使用 .Buffer(2, 1)
的方法:
IObservable<Result> query2 =
source
.StartWith(new Result() { Flag = true })
.Buffer(2, 1)
.Where(rs => rs.First().Flag)
.SelectMany(rs => rs.Skip(1));
这里有多种方法可以做到:
var result1 = source.Publish(_source => _source
.Zip(_source.Skip(1), (older, newer) => (older, newer))
.Where(t => t.older.Flag == true)
.Select(t => t.newer)
.Merge(_source.Take(1))
.Select(r => r.Text)
);
var result2 = source.Publish(_source => _source
.Buffer(2, 1)
.Where(l => l[0].Flag == true)
.Select(l => l[1])
.Merge(_source.Take(1))
.Select(l => l.Text)
);
var result3 = source.Publish(_source => _source
.Window(2, 1)
.SelectMany(w => w
.TakeWhile((r, index) => (index == 0 && r.Flag) || index == 1)
.Skip(1)
)
.Merge(_source.Take(1))
.Select(l => l.Text)
);
var result4 = source
.Scan((result: new Result {Flag = true, Text = null}, emit: false), (state, r) => (r, state.result.Flag))
.Where(t => t.emit)
.Select(t => t.result.Text);
我偏爱扫描版,但实际上,取决于您。
我是响应式扩展 (rx) 的新手,并尝试在 .NET 中执行以下操作(但在 JS 和其他语言中应该相同。):
我有一个传入的流,其中包含一个字符串和一个布尔值 属性。流将是无限的。我有以下条件:
- 应该始终打印第一个对象。
- 现在应该跳过所有传入的对象,直到对象到达时 bool 属性 设置为 "true"。
- 当 bool 属性 设置为 "true" 的对象到达时,应跳过该对象但应打印下一个对象(无论属性是什么)。
- 现在以这种方式进行,应该打印 属性 设置为 true 的对象之后的每个对象。
示例:
("one", false)--("two", true)--("three", false)--("four", false)--("five", true)--("six", true)--("seven", true)--("eight", false)--("nine", true)--("ten", false)
预期结果:
"one"--"three"--"six"--"seven"--"eight"--"ten"
请注意 "six" 和 "seven" 已被打印,因为它们跟随 属性 设置为 true 的对象,即使它们自己的 属性 也设置为"true".
用于测试的简单 .NET 程序:
using System;
using System.Reactive.Linq;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
class Result
{
public bool Flag { get; set; }
public string Text { get; set; }
}
static void Main(string[] args)
{
var source =
Observable.Create<Result>(f =>
{
f.OnNext(new Result() { Text = "one", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "two", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "three", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "four", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "five", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "six", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "seven", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "eight", Flag = false });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "nine", Flag = true });
Thread.Sleep(1000);
f.OnNext(new Result() { Text = "ten", Flag = false });
return () => Console.WriteLine("Observer has unsubscribed");
});
}
}
}
我尝试使用 .Scan
和 .Buffer
扩展,但我不知道如何在我的场景中使用它们。
性能当然应该尽可能好,因为最后流将是无限的。
这是我用 TypeScript 编写代码的方式
const printItem = (print: boolean) => {
return (data) => {
if (print) {
console.log(data);
}
};
}
let printItemFunction = printItem(true);
from(data)
.pipe(
tap(item => printItemFunction(item.data)),
tap(item => printItemFunction = printItem(item.printBool))
)
.subscribe()
基本思想是使用更高级别的函数,printItem
,returns 一个知道是否打印以及打印什么的函数。返回的函数存储在一个变量中,printItemFunction
.
对于源 Observable 发出的每个项目,要做的第一件事是执行 printItemFunction
传递源 Observable 通知的数据。
第二件事是计算 printItem
函数并将结果存储在变量 printItemFunction
中,以便为以下通知做好准备。
在程序开始时,printItemFunction
被初始化为true
,因此总是打印第一项。
我对C#不熟悉,给你一个.NET的答案
感谢@Picci 的回答,我找到了一种方法:
Func<bool, Action<Result>> printItem = print =>
{
return data => {
if(print)
{
Console.WriteLine(data.Text);
}
};
};
var printItemFunction = printItem(true);
source.Do(item => printItemFunction(item))
.Do(item => printItemFunction = printItem(item.Flag))
.Subscribe();
但是,我不太确定这是否是最好的方法,因为不使用 Subscribe() 而是使用副作用对我来说似乎有点奇怪。最后我不仅想打印这些值,还想用它调用 Webservice。
试试这个方法:
var results = new[]
{
new Result() { Text = "one", Flag = false },
new Result() { Text = "two", Flag = true },
new Result() { Text = "three", Flag = false },
new Result() { Text = "four", Flag = false },
new Result() { Text = "five", Flag = true },
new Result() { Text = "six", Flag = true },
new Result() { Text = "seven", Flag = true },
new Result() { Text = "eight", Flag = false },
new Result() { Text = "nine", Flag = true },
new Result() { Text = "ten", Flag = false },
};
IObservable<Result> source =
Observable
.Generate(
0, x => x < results.Length, x => x + 1,
x => results[x],
x => TimeSpan.FromSeconds(1.0));
以上只是以比您的 Observable.Create<Result>
方法更惯用的方式生成 source
。
现在是 query
:
IObservable<Result> query =
source
.StartWith(new Result() { Flag = true })
.Publish(ss =>
ss
.Skip(1)
.Zip(ss, (s1, s0) =>
s0.Flag
? Observable.Return(s1)
: Observable.Empty<Result>())
.Merge());
这里使用.Publish
允许源observable只有一个订阅,但是在.Publish
方法中可以多次使用。然后可以使用标准 Skip(1).Zip
方法来检查正在生成的后续值。
这是输出:
在 Shlomo 的启发下,这是我使用 .Buffer(2, 1)
的方法:
IObservable<Result> query2 =
source
.StartWith(new Result() { Flag = true })
.Buffer(2, 1)
.Where(rs => rs.First().Flag)
.SelectMany(rs => rs.Skip(1));
这里有多种方法可以做到:
var result1 = source.Publish(_source => _source
.Zip(_source.Skip(1), (older, newer) => (older, newer))
.Where(t => t.older.Flag == true)
.Select(t => t.newer)
.Merge(_source.Take(1))
.Select(r => r.Text)
);
var result2 = source.Publish(_source => _source
.Buffer(2, 1)
.Where(l => l[0].Flag == true)
.Select(l => l[1])
.Merge(_source.Take(1))
.Select(l => l.Text)
);
var result3 = source.Publish(_source => _source
.Window(2, 1)
.SelectMany(w => w
.TakeWhile((r, index) => (index == 0 && r.Flag) || index == 1)
.Skip(1)
)
.Merge(_source.Take(1))
.Select(l => l.Text)
);
var result4 = source
.Scan((result: new Result {Flag = true, Text = null}, emit: false), (state, r) => (r, state.result.Flag))
.Where(t => t.emit)
.Select(t => t.result.Text);
我偏爱扫描版,但实际上,取决于您。