Observable.FromAsync+Repeat+TakeWhile 的组合创建无限循环

Combination of Observable.FromAsync+Repeat+TakeWhile creates infinite loop

有人可以解释为什么即使到达流的末尾,以下 AsObservable 方法也会创建无限循环吗?

public static class StreamExt {
    public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
        return Observable
            .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
            .Repeat()
            .TakeWhile(bytes => bytes != null) // EndOfStream
            .SelectMany(bytes => bytes);
    }

    private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
        var buf = new byte[bufferSize];
        var bytesRead = await stream
            .ReadAsync(buf, 0, bufferSize, cancel)
            .ConfigureAwait(false);

        if (bytesRead < 1) return null; // EndOfStream
        var result_size = Math.Min(bytesRead, bufferSize);
        Array.Resize(ref buf, result_size);
        return buf;
    }
}

快速测试表明它会产生无限循环:

class Program {
    static void Main(string[] args) {
        using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
            var testResult = stream
                .AsObservable(1024)
                .ToEnumerable()
                .ToArray();
            Console.WriteLine(testResult.Length);
        }
    }
}

当然我可以添加一个 .SubscribeOn(TaskPoolScheduler.Default) 但是,无限循环仍然存在(阻止任务池调度程序 + 从 Stream 无限读取)。

[更新 2017-05-09]

Shlomo 发布了一个更好的示例来重现此问题:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");

您可以通过以下方式确认 OnCompleted 是否正确生成:

using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
    var testResult = stream
        .AsObservable(1024)
        ;
    testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}

.FromAsync + .Repeat 组合似乎有问题。以下代码的作用类似:

int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");

...而此代码正确终止:

var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");

Shane Neuville 发布的链接包含对此行为的解释:

对于最终来到这里并需要答案的人,而不仅仅是解释:问题似乎是 FromAsync 的默认调度程序,如 this self-answered question 所示。如果您调整到 "current thread" 调度器,Repeat().TakeWhile(...) 的行为更可预测。例如。 (问题摘录):

.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), 
    System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream