如何根据涉及第一个元素的谓词转换可观察对象
How to transform observable based on predicate involving first element
我正在尝试创建一个 Rx.NET 运算符,它接受 Observable<string>
和:
- 如果第一个元素是
"a"
,则转发每个元素不变
- 否则只发出一个完成信号
例如:
-a-b-c-d-|- --> -a-b-c-d-|-
-b-c-d-|- --> -|-
我该怎么做?
这是一种方法:
/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published =>
published
.Where(x => !comparer.Equals(x, expectedFirstElement))
.Take(1)
.IgnoreElements()
.Amb(published)
);
}
此实现使用 Amb
运算符(“ambiguous”的缩写),它采用两个序列并传播最先做出反应的序列。
- 如果第一个元素有想要的值,第一个序列(
published.Where
+Take
+IgnoreElements
)不反应,所以传播第二个序列(published
,这是整个序列)。此时第一个序列已取消订阅,因此不会为后续元素调用 comparer.Equals
方法。
- 如果第一个元素没有所需的值,第一个序列发出完成通知,由
Amb
运算符传播,第二个序列(整个序列)被忽略。
用法示例:
IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");
注意:此实现基于这样的假设:当两个序列同时反应时,Amb
运算符始终选择第一个序列。文档中没有提到这一点,它只说明 “Amb
运算符使用并行处理来检测哪个序列产生第一项”。 source code 非常复杂,所以我不能通过阅读来推导出这个保证。如果你想要更可靠的东西,你可以试试这个实现:
return Observable.Create<T>(observer =>
{
bool first = true;
return source.Subscribe(item =>
{
if (first)
{
first = false;
if (!comparer.Equals(item, expectedFirstElement))
{
observer.OnCompleted(); return;
}
}
observer.OnNext(item);
}, observer.OnError, observer.OnCompleted);
});
这是一个绝对没有竞争条件的版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
from x in published.Take(1)
from y in
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()
select y);
方法语法版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
published
.Take(1)
.SelectMany(x =>
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()));
我更喜欢查询语法,但是嘿...
我正在尝试创建一个 Rx.NET 运算符,它接受 Observable<string>
和:
- 如果第一个元素是
"a"
,则转发每个元素不变
- 否则只发出一个完成信号
例如:
-a-b-c-d-|- --> -a-b-c-d-|-
-b-c-d-|- --> -|-
我该怎么做?
这是一种方法:
/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published =>
published
.Where(x => !comparer.Equals(x, expectedFirstElement))
.Take(1)
.IgnoreElements()
.Amb(published)
);
}
此实现使用 Amb
运算符(“ambiguous”的缩写),它采用两个序列并传播最先做出反应的序列。
- 如果第一个元素有想要的值,第一个序列(
published.Where
+Take
+IgnoreElements
)不反应,所以传播第二个序列(published
,这是整个序列)。此时第一个序列已取消订阅,因此不会为后续元素调用comparer.Equals
方法。 - 如果第一个元素没有所需的值,第一个序列发出完成通知,由
Amb
运算符传播,第二个序列(整个序列)被忽略。
用法示例:
IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");
注意:此实现基于这样的假设:当两个序列同时反应时,Amb
运算符始终选择第一个序列。文档中没有提到这一点,它只说明 “Amb
运算符使用并行处理来检测哪个序列产生第一项”。 source code 非常复杂,所以我不能通过阅读来推导出这个保证。如果你想要更可靠的东西,你可以试试这个实现:
return Observable.Create<T>(observer =>
{
bool first = true;
return source.Subscribe(item =>
{
if (first)
{
first = false;
if (!comparer.Equals(item, expectedFirstElement))
{
observer.OnCompleted(); return;
}
}
observer.OnNext(item);
}, observer.OnError, observer.OnCompleted);
});
这是一个绝对没有竞争条件的版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
from x in published.Take(1)
from y in
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()
select y);
方法语法版本:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
published
.Take(1)
.SelectMany(x =>
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()));
我更喜欢查询语法,但是嘿...