不会多次调用 Observer OnError 委托
Observer OnError delegate is not called multiple times
您好,我想了解为什么 OnError
不会每次都被 Observer
调用。
当尝试调试在我的 IOBservable
实现中调用的 OnError
时,调试器通过了。
可观察实现
public class Reactive:IStorage,IObservable<SquareResult>
{
private object @lock = new object();
private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();
public static Reactive Create()
{
return new Reactive();
}
public void Enqueue(SquareResult square)
{
lock (@lock)
{
foreach (var item in observers)
{
if (square.Result < 0)
{
item.OnError(new InvalidSquareException());
}
else
item.OnNext(square);
}
}
}
public void EndStoring()
{
this.observers.ForEach(obs => obs.OnCompleted());
}
public IDisposable Subscribe(IObserver<SquareResult> observer)
{
if (!this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return new Unsubscriber(observer, this.observers);
}
public class Unsubscriber : IDisposable
{
private List<IObserver<SquareResult>> observers;
private IObserver<SquareResult> observer;
public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
{
this.observer = obs;
this.observers = observers;
}
public void Dispose()
{
if (this.observer == null)
{
return;
}
this.observers.Remove(this.observer);
}
}
}
生产者调用的唯一方法是 Enqueue
。
然后我们有一个服务,其中 consumer
从 IObservable
获取数据并通过套接字发送:
消费者
class ProgressService
{
private IStorage observable;
public ProgressService(IStorage storage)
{
this.observable = storage;
}
public async Task NotifyAsync(WebSocket socket)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
CancellationTokenSource cancelSignal = new CancellationTokenSource();
this.observable.Subscribe(async (next) =>
{
try
{
ReadOnlyMemory<byte> data = next.Encode();
await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
}
return;
}
}, async(ex) =>
{
//!! this delegate does not get called everytime the observable calls OnError(exception)
await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
}, async () =>
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
}, cancelSignal.Token);
await socket.ReceiveAsync(buffer, CancellationToken.None);
cancelSignal.Cancel();
}
}
所以基本上发生的情况是,如果我从某个地方 myobservable.OnError(new exception())
调用几次,consumer
的回调只被调用一次,我不明白为什么。
例子:
observable.OnError(new Exception());
observable.OnError(new Exception());
Observer.OnError实现
public void OnError(Exception ex)
{
Console.WriteLine(ex.Message);
}
我的代码中的上述示例只会打印一次异常消息。
'Observable contract' 允许 0 个或多个 OnNext
通知,然后是一个 OnCompleted
或一个 OnError
通知,这会终止可观察对象。一旦 observable 终止(在您的情况下是第一个 OnError
),就不再发出通知,这意味着不再调用委托。
您好,我想了解为什么 OnError
不会每次都被 Observer
调用。
当尝试调试在我的 IOBservable
实现中调用的 OnError
时,调试器通过了。
可观察实现
public class Reactive:IStorage,IObservable<SquareResult>
{
private object @lock = new object();
private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();
public static Reactive Create()
{
return new Reactive();
}
public void Enqueue(SquareResult square)
{
lock (@lock)
{
foreach (var item in observers)
{
if (square.Result < 0)
{
item.OnError(new InvalidSquareException());
}
else
item.OnNext(square);
}
}
}
public void EndStoring()
{
this.observers.ForEach(obs => obs.OnCompleted());
}
public IDisposable Subscribe(IObserver<SquareResult> observer)
{
if (!this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return new Unsubscriber(observer, this.observers);
}
public class Unsubscriber : IDisposable
{
private List<IObserver<SquareResult>> observers;
private IObserver<SquareResult> observer;
public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
{
this.observer = obs;
this.observers = observers;
}
public void Dispose()
{
if (this.observer == null)
{
return;
}
this.observers.Remove(this.observer);
}
}
}
生产者调用的唯一方法是 Enqueue
。
然后我们有一个服务,其中 consumer
从 IObservable
获取数据并通过套接字发送:
消费者
class ProgressService
{
private IStorage observable;
public ProgressService(IStorage storage)
{
this.observable = storage;
}
public async Task NotifyAsync(WebSocket socket)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
CancellationTokenSource cancelSignal = new CancellationTokenSource();
this.observable.Subscribe(async (next) =>
{
try
{
ReadOnlyMemory<byte> data = next.Encode();
await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
}
return;
}
}, async(ex) =>
{
//!! this delegate does not get called everytime the observable calls OnError(exception)
await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
}, async () =>
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
}, cancelSignal.Token);
await socket.ReceiveAsync(buffer, CancellationToken.None);
cancelSignal.Cancel();
}
}
所以基本上发生的情况是,如果我从某个地方 myobservable.OnError(new exception())
调用几次,consumer
的回调只被调用一次,我不明白为什么。
例子:
observable.OnError(new Exception());
observable.OnError(new Exception());
Observer.OnError实现
public void OnError(Exception ex)
{
Console.WriteLine(ex.Message);
}
我的代码中的上述示例只会打印一次异常消息。
'Observable contract' 允许 0 个或多个 OnNext
通知,然后是一个 OnCompleted
或一个 OnError
通知,这会终止可观察对象。一旦 observable 终止(在您的情况下是第一个 OnError
),就不再发出通知,这意味着不再调用委托。