Reactive extensions(Rx) Switch() 产生新的 observable,它没有订阅提供的 OnCompleted()

Reactive extensions(Rx) Switch() produces new observable which is not subscribed to provided OnCompleted()

我在使用 Switch 语句订阅 Rx 时遇到问题。

_performSearchSubject
            .AsObservable()
            .Select(_ => PerformQuery())
            .Switch()
            .ObserveOn(_synchronizationContextService.SynchronizationContext)
            .Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
            .DisposeWith(this);

流量为:

  1. 一些属性发生变化,performSearchSubject.OnNext 被调用
  2. PerformPositionQuery() 被调用,returns 每次被击中时一个观察者
  3. 通过该观察者响应的服务在数据接收完成时调用两次 OnNext 和一次 OnCompleted
  4. 方法 DataArrivedForPositions 按预期被调用了两次
  5. 方法 PositionQueryCompleted 从未被调用,尽管在我的数据服务中调用了 observer.OnCompleted()。

dataService 的代码是:

        protected override void Request(Request request, IObserver<Response> observer)
        {
            query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
            query.Error += (type, s, message) => QueryError(observer, message);
            query.NoMoreData += id => QueryCompleted(observer);

            query.Execute(request);
        }

        private void QueryError(IObserver<PositionSheetResponse> observer, string message)
        {
            observer.OnError(new Exception(message));
        }

        private void QueryCompleted(IObserver<PositionSheetResponse> observer)
        {
            observer.OnCompleted();
        }

        private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
        {
            observer.OnNext(ConvertToResponse(requestId, receiveData));
        }

Switch 结果只会在您的外部可观察对象 (_performSearchSubject) 完成时完成。我假设在你的情况下这个永远不会(它可能绑定到执行搜索的用户操作)。

不清楚的是何时 您希望PositionQueryCompleted 被调用。如果在处理完每一个成功的查询之后,那么你的流需要修改,因为 Switch 丢失了查询流完成的信息,但它也缺少关于 UI 的信息(错误的调度程序even) 说它的数据是否真的被处理过。

可能有其他方法可以实现它,但基本上您希望您的查询流完成 通过 Switch 生存 (目前忽略此事件)。例如,您可以将查询流转换为具有 n+1 个事件,并为完整的事件增加一个:

    _performSearchSubject
        .AsObservable()
        .Select(_ => 
                  PerformQuery()
                  .Select(Data => new { Data, Complete = false})
                  .Concat(Observable.Return(new { Data = (string)null, Complete = true })))

您可以安全地申请.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext),但是您需要修改您的订阅:

    .Subscribe(data => {
        if (data.Complete) DataArrivedForPositions(data.Data);
        else PositionQueryCompleted()
    }, PositionQueryError)