如何重复一个可观察序列直到它为空?
How to repeat an observable sequence until it's empty?
我有一个 IObservable<int>
序列,它在订阅的前 9 次发出一个项目,在以后的订阅中它什么都不发出并立即完成:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
现在我想重复这个序列直到它完成。所以我使用了 Repeat
运算符:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
问题是此查询永远不会完成。 Repeat
一次又一次地订阅 source
序列直到永远。更糟糕的是,当 source
停止生成元素时,查询会进入无情的死循环,劫持 CPU 的一个核心(我的四核机器报告连续 CPU 使用率25%)。这是上面代码的输出:
1
2
3
4
5
6
7
8
9
我想要的是 Repeat
运算符的变体,它在 source
停止生成元素时停止重复 source
。通过内置的 Rx 运算符搜索我可以看到一个 RepeatWhen
运算符,但显然这只能用于更快地开始下一次重复,而不是完全停止重复:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
虽然我不是 100% 确定,因为 handler
参数的描述很晦涩,所以我可能遗漏了一些东西:
The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.
我的问题是: 我如何实现一个 RepeatUntilEmpty
运算符来重复 source
序列直到它为空?是否可以基于前面提到的 RepeatWhen
运算符来实现它?如果不是,我是否应该降低级别 (Observable.Create
) 并从头开始重新实现基本的 Repeat
功能?或者我可以使用 Materialize
运算符来发挥我的优势,以某种方式将它与现有的 Repeat
结合起来吗?我现在没主意了。我愿意接受任何一种解决方案,无论是高杠杆还是低杠杆。
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
在我的原始代码中用 RepeatUntilEmpty
替换 Repeat
应该具有在发出 9
元素后立即完成查询的效果。
您确实可以根据从 Repeat()
语句收到的通知使用 Materialize()
/Dematerialize()
to build your own sequence of notifications。通知序列将如下所示:
1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...
所以我们寻找两个连续的 OnCompleted
通知。如果我们没有找到,我们仍然 return 收到 OnNext
通知,否则我们 return OnCompleted
通知。代码可能如下所示:
public static void Main(string[] args)
{
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
Console.WriteLine($"counter is now: {counter}");
if (counter > 20) {
System.Environment.Exit(1);
}
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
source
.RepeatUntilEmpty()
.Subscribe(x => {
System.Threading.Thread.Sleep(10);
Console.WriteLine($"SUBSCRIBE: {x}");
}, () => Console.WriteLine("SUBSCRIBE:Completed"));
System.Threading.Thread.Sleep(10000);
Console.WriteLine("Main thread terminated");
}
使用RepeatUntilEmpty()
方法如下:
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
return source
.Materialize()
.Repeat()
.StartWith((Notification<T>)null)
.Buffer(2, 1)
.Select(it => {
Console.WriteLine($"Buffer content: {String.Join(",", it)}");
if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
return it[1];
}
// it[1] is OnCompleted, check the previous one
if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
// not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
return null;
}
// okay, we have two consecutive OnCompleted, stop this observable.
return it[1];
})
.Where(it => it != null) // remove the NULL marker
.Dematerialize();
}
这将生成以下输出:
counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated
我尚未测试此代码如何处理 OnError()
通知,因此您可能需要检查一下。此外,我还遇到了 source.Materialize().Repeat()
部分将从原始来源读取更多数据的问题,即使它后来决定停止可观察的。特别是 Do().Wait()
语句我有时会收到额外的输出,如:
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14
这对您来说可能也是个问题,因为 Repeat()
部分仍在尝试 read/concat 清空可观察对象。
我有一个 IObservable<int>
序列,它在订阅的前 9 次发出一个项目,在以后的订阅中它什么都不发出并立即完成:
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
现在我想重复这个序列直到它完成。所以我使用了 Repeat
运算符:
source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();
问题是此查询永远不会完成。 Repeat
一次又一次地订阅 source
序列直到永远。更糟糕的是,当 source
停止生成元素时,查询会进入无情的死循环,劫持 CPU 的一个核心(我的四核机器报告连续 CPU 使用率25%)。这是上面代码的输出:
1
2
3
4
5
6
7
8
9
我想要的是 Repeat
运算符的变体,它在 source
停止生成元素时停止重复 source
。通过内置的 Rx 运算符搜索我可以看到一个 RepeatWhen
运算符,但显然这只能用于更快地开始下一次重复,而不是完全停止重复:
// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);
虽然我不是 100% 确定,因为 handler
参数的描述很晦涩,所以我可能遗漏了一些东西:
The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.
我的问题是: 我如何实现一个 RepeatUntilEmpty
运算符来重复 source
序列直到它为空?是否可以基于前面提到的 RepeatWhen
运算符来实现它?如果不是,我是否应该降低级别 (Observable.Create
) 并从头开始重新实现基本的 Repeat
功能?或者我可以使用 Materialize
运算符来发挥我的优势,以某种方式将它与现有的 Repeat
结合起来吗?我现在没主意了。我愿意接受任何一种解决方案,无论是高杠杆还是低杠杆。
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}
在我的原始代码中用 RepeatUntilEmpty
替换 Repeat
应该具有在发出 9
元素后立即完成查询的效果。
您确实可以根据从 Repeat()
语句收到的通知使用 Materialize()
/Dematerialize()
to build your own sequence of notifications。通知序列将如下所示:
1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...
所以我们寻找两个连续的 OnCompleted
通知。如果我们没有找到,我们仍然 return 收到 OnNext
通知,否则我们 return OnCompleted
通知。代码可能如下所示:
public static void Main(string[] args)
{
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
Console.WriteLine($"counter is now: {counter}");
if (counter > 20) {
System.Environment.Exit(1);
}
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
source
.RepeatUntilEmpty()
.Subscribe(x => {
System.Threading.Thread.Sleep(10);
Console.WriteLine($"SUBSCRIBE: {x}");
}, () => Console.WriteLine("SUBSCRIBE:Completed"));
System.Threading.Thread.Sleep(10000);
Console.WriteLine("Main thread terminated");
}
使用RepeatUntilEmpty()
方法如下:
public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
return source
.Materialize()
.Repeat()
.StartWith((Notification<T>)null)
.Buffer(2, 1)
.Select(it => {
Console.WriteLine($"Buffer content: {String.Join(",", it)}");
if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
return it[1];
}
// it[1] is OnCompleted, check the previous one
if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
// not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
return null;
}
// okay, we have two consecutive OnCompleted, stop this observable.
return it[1];
})
.Where(it => it != null) // remove the NULL marker
.Dematerialize();
}
这将生成以下输出:
counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated
我尚未测试此代码如何处理 OnError()
通知,因此您可能需要检查一下。此外,我还遇到了 source.Materialize().Repeat()
部分将从原始来源读取更多数据的问题,即使它后来决定停止可观察的。特别是 Do().Wait()
语句我有时会收到额外的输出,如:
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14
这对您来说可能也是个问题,因为 Repeat()
部分仍在尝试 read/concat 清空可观察对象。