如何重复一个可观察序列直到它为空?

How to repeat an observable sequence until it's empty?

我有一个 IObservable<int> 序列,它在订阅的前 9 次发出一个项目,在以后的订阅中它什么都不发出并立即完成:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (++counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});

现在我想重复这个序列直到它完成。所以我使用了 Repeat 运算符:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

问题是此查询永远不会完成。 Repeat 一次又一次地订阅 source 序列直到永远。更糟糕的是,当 source 停止生成元素时,查询会进入无情的死循环,劫持 CPU 的一个核心(我的四核机器报告连续 CPU 使用率25%)。这是上面代码的输出:

1
2
3
4
5
6
7
8
9

我想要的是 Repeat 运算符的变体,它在 source 停止生成元素时停止重复 source。通过内置的 Rx 运算符搜索我可以看到一个 RepeatWhen 运算符,但显然这只能用于更快地开始下一次重复,而不是完全停止重复:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

虽然我不是 100% 确定,因为 handler 参数的描述很晦涩,所以我可能遗漏了一些东西:

The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.

我的问题是: 我如何实现一个 RepeatUntilEmpty 运算符来重复 source 序列直到它为空?是否可以基于前面提到的 RepeatWhen 运算符来实现它?如果不是,我是否应该降低级别 (Observable.Create) 并从头开始重新实现基本的 Repeat 功能?或者我可以使用 Materialize 运算符来发挥我的优势,以某种方式将它与现有的 Repeat 结合起来吗?我现在没主意了。我愿意接受任何一种解决方案,无论是高杠杆还是低杠杆。

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

在我的原始代码中用 RepeatUntilEmpty 替换 Repeat 应该具有在发出 9 元素后立即完成查询的效果。

您确实可以根据从 Repeat() 语句收到的通知使用 Materialize()/Dematerialize() to build your own sequence of notifications。通知序列将如下所示:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

所以我们寻找两个连续的 OnCompleted 通知。如果我们没有找到,我们仍然 return 收到 OnNext 通知,否则我们 return OnCompleted 通知。代码可能如下所示:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine($"counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (++counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

使用RepeatUntilEmpty()方法如下:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine($"Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

这将生成以下输出:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

我尚未测试此代码如何处理 OnError() 通知,因此您可能需要检查一下。此外,我还遇到了 source.Materialize().Repeat() 部分将从原始来源读取更多数据的问题,即使它后来决定停止可观察的。特别是 Do().Wait() 语句我有时会收到额外的输出,如:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

这对您来说可能也是个问题,因为 Repeat() 部分仍在尝试 read/concat 清空可观察对象。