如何传播一个可观察的序列,但前提是它以特定的值集合开始?
How to propagate an observable sequence, but only if it starts with a specific collection of values?
这是一个“纯属娱乐”的问题,由 jackdry 扩展和概括了 。这里的问题是如何实现一个接受 IObservable<T>
和:
的 Rx 运算符
- 如果序列的第一个元素与给定集合中的元素相等且顺序相同,则转发每个元素不变 (
ICollection<T>
)。
- 否则发出空序列。
例如给定值集合 [a, b, c]:
Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|
请求的操作员签名:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default);
给你:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in published.Take(expectedFirstElements.Count).ToArray()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
我试图通过尽早失败来提高效率,但每次尝试都会降低效率。
这是我的测试代码:
new[] { 1, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(7)
.Select(x => x + 1)
.IfFirstElements(new long[] { 1, 2, 3 })
.Dump();
new[] { 2, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
LINQPad 需要 运行 上面的代码才能得到这个输出:
让它提前结束有点困难,但这里是:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in
published
.Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
.TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
.Take(expectedFirstElements.Count)
.LastAsync()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
这是另一种方法。这个类似于 Enigmativity 的 ,并且使用几乎相同的运算符。不同之处在于,当 source
序列发出一个元素时,仅检查此元素是否与 expectedFirstElements
集合的相应元素相等,使其成为 O(n) 算法。
/// <summary>
/// If the first elements have the expected values, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElements<T>(this IObservable<T> source,
ICollection<T> expectedFirstElements, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published => published
.Zip(expectedFirstElements)
.TakeWhile(e => comparer.Equals(e.First, e.Second))
.Take(expectedFirstElements.Count)
.Select(e => e.First)
.ToList()
.SelectMany(list => list.Count == expectedFirstElements.Count ?
published.StartWith(list) : Observable.Empty<T>()));
}
附带说明一下,我的初衷是将 IEnumerable<T>
作为预期值的容器。我找不到一个解决方案,尽管它并没有急切地实现可枚举,所以我作弊并将问题改编为我手头的解决方案。因此容器的类型是 ICollection<T>
.
这是一个“纯属娱乐”的问题,由 jackdry 扩展和概括了 IObservable<T>
和:
- 如果序列的第一个元素与给定集合中的元素相等且顺序相同,则转发每个元素不变 (
ICollection<T>
)。 - 否则发出空序列。
例如给定值集合 [a, b, c]:
Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|
请求的操作员签名:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default);
给你:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in published.Take(expectedFirstElements.Count).ToArray()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
我试图通过尽早失败来提高效率,但每次尝试都会降低效率。
这是我的测试代码:
new[] { 1, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(7)
.Select(x => x + 1)
.IfFirstElements(new long[] { 1, 2, 3 })
.Dump();
new[] { 2, 2, 3, 4 }
.ToObservable()
.IfFirstElements(new[] { 1, 2, 3 })
.Dump();
LINQPad 需要 运行 上面的代码才能得到这个输出:
让它提前结束有点困难,但这里是:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in
published
.Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
.TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
.Take(expectedFirstElements.Count)
.LastAsync()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
这是另一种方法。这个类似于 Enigmativity 的 source
序列发出一个元素时,仅检查此元素是否与 expectedFirstElements
集合的相应元素相等,使其成为 O(n) 算法。
/// <summary>
/// If the first elements have the expected values, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElements<T>(this IObservable<T> source,
ICollection<T> expectedFirstElements, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published => published
.Zip(expectedFirstElements)
.TakeWhile(e => comparer.Equals(e.First, e.Second))
.Take(expectedFirstElements.Count)
.Select(e => e.First)
.ToList()
.SelectMany(list => list.Count == expectedFirstElements.Count ?
published.StartWith(list) : Observable.Empty<T>()));
}
附带说明一下,我的初衷是将 IEnumerable<T>
作为预期值的容器。我找不到一个解决方案,尽管它并没有急切地实现可枚举,所以我作弊并将问题改编为我手头的解决方案。因此容器的类型是 ICollection<T>
.