Observable.Repeat 势不可挡,是 bug 还是 feature?
The Observable.Repeat is unstoppable, is it a bug or a feature?
我注意到 Repeat
operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile
运算符的行为有些奇怪,并且显然会一直持续 运行。为了演示,我创建了一个生成单个值的源可观察对象,该值在每次订阅时递增。第一个订阅者获得值 1,第二个订阅者获得值 2 等等:
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
await Task.CompletedTask;
//await Task.Yield();
Thread.Sleep(100);
var value = Interlocked.Increment(ref incrementalValue);
o.OnNext(value);
o.OnCompleted();
});
然后我将运算符 Repeat
、TakeWhile
和 LastAsync
附加到这个 observable,这样程序将等待组合的 observable 产生它的最后一个值:
incremental.Repeat()
.Do(new CustomObserver("Checkpoint A"))
.TakeWhile(item => item <= 5)
.Do(new CustomObserver("Checkpoint B"))
.LastAsync()
.Do(new CustomObserver("Checkpoint C"))
.Wait();
Console.WriteLine($"Done");
class CustomObserver : IObserver<int>
{
private readonly string _name;
public CustomObserver(string name) => _name = name;
public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}
这是这个程序的输出:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...
它永远不会结束!虽然 LastAsync
已经产生了它的值并完成了,但是 Repeat
运算符还在不停地旋转!
只有当源可观察对象同步通知其订阅者时才会发生这种情况。例如,取消注释行 //await Task.Yield();
后,程序的行为符合预期:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done
Repeat
运算符停止旋转,尽管它没有报告完成(我猜它已被取消订阅)。
有没有什么方法可以使 Repeat
运算符的行为保持一致,而不管它收到的通知类型(同步或异步)如何?
.NET Core 3.0,C# 8,System.Reactive 4.3.2,控制台应用程序
您可能希望 Repeat
的实现以 OnCompleted
通知为特色,但它在 Concat
-ing 无限流方面变成 it's implemented。
public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
RepeatInfinite(source).Concat();
private static IEnumerable<T> RepeatInfinite<T>(T value)
{
while (true)
{
yield return value;
}
}
随着责任转移到 Concat
- 我们可以创建一个简化版本(血淋淋的实施细节在 TailRecursiveSink.cs
中)。除非 await Task.Yield()
.
提供了不同的执行上下文,否则它仍然会继续旋转
public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
Observable.Create<T>(observer =>
{
var check = new BooleanDisposable();
IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
{
if (check.IsDisposed)
return Disposable.Empty;
if (enumerator.MoveNext()) //this never returns false
return enumerator.Current.Subscribe(
observer.OnNext,
() => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
);
else
return inner.Schedule(observer.OnCompleted); //this never runs
}
Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
return check;
});
作为无限流,enumerator.MoveNext()
始终 returns 为真,因此另一个分支永远不会运行 - 这是预期的;这不是我们的问题。
当调用o.OnCompleted()
时,它会立即安排下一个迭代循环
Schedule(enumerator, loopRec)
同步调用下一个 o.OnCompleted()
,并且它无限地继续 - 没有任何地方可以逃脱此递归。
如果用await Task.Yield()
进行上下文切换,那么Schedule(enumerator, loopRec)
会立即退出,o.OnCompleted()
会被非同步调用
Repeat
和 Concat
使用当前线程在不更改上下文的情况下工作 - 这不是不正确的行为,但是当相同的上下文也用于推送通知时,它可能会导致僵局或陷入困境
永久蹦床。
带注释的调用堆栈
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code]
Main(args) //incremental.RepeatEx()...
我注意到 Repeat
operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile
运算符的行为有些奇怪,并且显然会一直持续 运行。为了演示,我创建了一个生成单个值的源可观察对象,该值在每次订阅时递增。第一个订阅者获得值 1,第二个订阅者获得值 2 等等:
int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
await Task.CompletedTask;
//await Task.Yield();
Thread.Sleep(100);
var value = Interlocked.Increment(ref incrementalValue);
o.OnNext(value);
o.OnCompleted();
});
然后我将运算符 Repeat
、TakeWhile
和 LastAsync
附加到这个 observable,这样程序将等待组合的 observable 产生它的最后一个值:
incremental.Repeat()
.Do(new CustomObserver("Checkpoint A"))
.TakeWhile(item => item <= 5)
.Do(new CustomObserver("Checkpoint B"))
.LastAsync()
.Do(new CustomObserver("Checkpoint C"))
.Wait();
Console.WriteLine($"Done");
class CustomObserver : IObserver<int>
{
private readonly string _name;
public CustomObserver(string name) => _name = name;
public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}
这是这个程序的输出:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...
它永远不会结束!虽然 LastAsync
已经产生了它的值并完成了,但是 Repeat
运算符还在不停地旋转!
只有当源可观察对象同步通知其订阅者时才会发生这种情况。例如,取消注释行 //await Task.Yield();
后,程序的行为符合预期:
Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done
Repeat
运算符停止旋转,尽管它没有报告完成(我猜它已被取消订阅)。
有没有什么方法可以使 Repeat
运算符的行为保持一致,而不管它收到的通知类型(同步或异步)如何?
.NET Core 3.0,C# 8,System.Reactive 4.3.2,控制台应用程序
您可能希望 Repeat
的实现以 OnCompleted
通知为特色,但它在 Concat
-ing 无限流方面变成 it's implemented。
public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
RepeatInfinite(source).Concat();
private static IEnumerable<T> RepeatInfinite<T>(T value)
{
while (true)
{
yield return value;
}
}
随着责任转移到 Concat
- 我们可以创建一个简化版本(血淋淋的实施细节在 TailRecursiveSink.cs
中)。除非 await Task.Yield()
.
public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
Observable.Create<T>(observer =>
{
var check = new BooleanDisposable();
IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
{
if (check.IsDisposed)
return Disposable.Empty;
if (enumerator.MoveNext()) //this never returns false
return enumerator.Current.Subscribe(
observer.OnNext,
() => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
);
else
return inner.Schedule(observer.OnCompleted); //this never runs
}
Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
return check;
});
作为无限流,enumerator.MoveNext()
始终 returns 为真,因此另一个分支永远不会运行 - 这是预期的;这不是我们的问题。
当调用o.OnCompleted()
时,它会立即安排下一个迭代循环
Schedule(enumerator, loopRec)
同步调用下一个 o.OnCompleted()
,并且它无限地继续 - 没有任何地方可以逃脱此递归。
如果用await Task.Yield()
进行上下文切换,那么Schedule(enumerator, loopRec)
会立即退出,o.OnCompleted()
会被非同步调用
Repeat
和 Concat
使用当前线程在不更改上下文的情况下工作 - 这不是不正确的行为,但是当相同的上下文也用于推送通知时,它可能会导致僵局或陷入困境
永久蹦床。
带注释的调用堆栈
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code]
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code]
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code]
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code]
Main(args) //incremental.RepeatEx()...