Rx .net 主题 OnNext 异常正在失去下游观察者

Rx .net subject OnNext exception is losing downstream observers

免责声明:我是 Rx.Net 的新手。

我想了解使用 Rx.Net 从服务器使用事件的最佳方式。目前,我有一个包含 rx 主题的消费者 class,将消费的更新委托给下游消费者,如下所示:

Event Listener/Processor:

public IObservable<IUpdate> UpdateStream => _subject?.AsObservable();

try
{
    // ... processing ... 
    _subject.OnNext(update); // update is the variable
}
catch (Exception ex)
{
    _subject.OnError(ex);
}

下游订阅者:

public void Subscribe()
{
  _eventListener.UpdateStream.Subscribe(update => 
  {
       _fooProcessor.Process(update);
  },
  ex => 
  {
     // log
     Subscribe(); // an effort to resubscribe lost subscription
  },
  () => { // log completion (optional)...}
}

我注意到subject throws exception onNext(已经添加了相同key的item),其中,subject.HasObservers 属性为false(即下游订阅列表丢失)。 OnError 代码行确实命中,但下游订阅者没有收到通知(因为订阅丢失)。

我尝试使用 Observer.EventPattern 来监听消费事件并创建供下游订阅者消费的可观察对象;但这也不起作用(我无法评估这种情况下的失败点)。

在这种情况下,是否有从下游消费者(在不同的 dll 上)重新订阅的模式?

感谢任何帮助。 谢谢!

我发现下游订阅者抛出异常,导致订阅中断。现在这不是问题。

谢谢 - How to handle exceptions in OnNext when using ObserveOn?