Observable.FromAsync+Repeat+TakeWhile 的组合创建无限循环
Combination of Observable.FromAsync+Repeat+TakeWhile creates infinite loop
有人可以解释为什么即使到达流的末尾,以下 AsObservable
方法也会创建无限循环吗?
public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}
private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);
if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf;
}
}
快速测试表明它会产生无限循环:
class Program {
static void Main(string[] args) {
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
var testResult = stream
.AsObservable(1024)
.ToEnumerable()
.ToArray();
Console.WriteLine(testResult.Length);
}
}
}
当然我可以添加一个 .SubscribeOn(TaskPoolScheduler.Default)
但是,无限循环仍然存在(阻止任务池调度程序 + 从 Stream
无限读取)。
[更新 2017-05-09]
Shlomo 发布了一个更好的示例来重现此问题:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
您可以通过以下方式确认 OnCompleted
是否正确生成:
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
var testResult = stream
.AsObservable(1024)
;
testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}
.FromAsync
+ .Repeat
组合似乎有问题。以下代码的作用类似:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
...而此代码正确终止:
var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");
Shane Neuville 发布的链接包含对此行为的解释:
对于最终来到这里并需要答案的人,而不仅仅是解释:问题似乎是 FromAsync
的默认调度程序,如 this self-answered question 所示。如果您调整到 "current thread" 调度器,Repeat().TakeWhile(...)
的行为更可预测。例如。 (问题摘录):
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel),
System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
有人可以解释为什么即使到达流的末尾,以下 AsObservable
方法也会创建无限循环吗?
public static class StreamExt {
public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
return Observable
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
.SelectMany(bytes => bytes);
}
private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
var buf = new byte[bufferSize];
var bytesRead = await stream
.ReadAsync(buf, 0, bufferSize, cancel)
.ConfigureAwait(false);
if (bytesRead < 1) return null; // EndOfStream
var result_size = Math.Min(bytesRead, bufferSize);
Array.Resize(ref buf, result_size);
return buf;
}
}
快速测试表明它会产生无限循环:
class Program {
static void Main(string[] args) {
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
var testResult = stream
.AsObservable(1024)
.ToEnumerable()
.ToArray();
Console.WriteLine(testResult.Length);
}
}
}
当然我可以添加一个 .SubscribeOn(TaskPoolScheduler.Default)
但是,无限循环仍然存在(阻止任务池调度程序 + 从 Stream
无限读取)。
[更新 2017-05-09]
Shlomo 发布了一个更好的示例来重现此问题:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
您可以通过以下方式确认 OnCompleted
是否正确生成:
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
var testResult = stream
.AsObservable(1024)
;
testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}
.FromAsync
+ .Repeat
组合似乎有问题。以下代码的作用类似:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
...而此代码正确终止:
var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
.Repeat()
.TakeWhile(l => l < 3)
;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");
Shane Neuville 发布的链接包含对此行为的解释:
对于最终来到这里并需要答案的人,而不仅仅是解释:问题似乎是 FromAsync
的默认调度程序,如 this self-answered question 所示。如果您调整到 "current thread" 调度器,Repeat().TakeWhile(...)
的行为更可预测。例如。 (问题摘录):
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel),
System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream