Reactive extensions(Rx) Switch() 产生新的 observable,它没有订阅提供的 OnCompleted()
Reactive extensions(Rx) Switch() produces new observable which is not subscribed to provided OnCompleted()
我在使用 Switch 语句订阅 Rx 时遇到问题。
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
流量为:
- 一些属性发生变化,performSearchSubject.OnNext 被调用
- PerformPositionQuery() 被调用,returns 每次被击中时一个观察者
- 通过该观察者响应的服务在数据接收完成时调用两次 OnNext 和一次 OnCompleted
- 方法 DataArrivedForPositions 按预期被调用了两次
- 方法 PositionQueryCompleted 从未被调用,尽管在我的数据服务中调用了 observer.OnCompleted()。
dataService 的代码是:
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}
Switch
结果只会在您的外部可观察对象 (_performSearchSubject
) 完成时完成。我假设在你的情况下这个永远不会(它可能绑定到执行搜索的用户操作)。
不清楚的是何时 您希望PositionQueryCompleted
被调用。如果在处理完每一个成功的查询之后,那么你的流需要修改,因为 Switch
丢失了查询流完成的信息,但它也缺少关于 UI 的信息(错误的调度程序even) 说它的数据是否真的被处理过。
可能有其他方法可以实现它,但基本上您希望您的查询流完成 通过 Switch
生存 (目前忽略此事件)。例如,您可以将查询流转换为具有 n+1 个事件,并为完整的事件增加一个:
_performSearchSubject
.AsObservable()
.Select(_ =>
PerformQuery()
.Select(Data => new { Data, Complete = false})
.Concat(Observable.Return(new { Data = (string)null, Complete = true })))
您可以安全地申请.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext)
,但是您需要修改您的订阅:
.Subscribe(data => {
if (data.Complete) DataArrivedForPositions(data.Data);
else PositionQueryCompleted()
}, PositionQueryError)
我在使用 Switch 语句订阅 Rx 时遇到问题。
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
流量为:
- 一些属性发生变化,performSearchSubject.OnNext 被调用
- PerformPositionQuery() 被调用,returns 每次被击中时一个观察者
- 通过该观察者响应的服务在数据接收完成时调用两次 OnNext 和一次 OnCompleted
- 方法 DataArrivedForPositions 按预期被调用了两次
- 方法 PositionQueryCompleted 从未被调用,尽管在我的数据服务中调用了 observer.OnCompleted()。
dataService 的代码是:
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}
Switch
结果只会在您的外部可观察对象 (_performSearchSubject
) 完成时完成。我假设在你的情况下这个永远不会(它可能绑定到执行搜索的用户操作)。
不清楚的是何时 您希望PositionQueryCompleted
被调用。如果在处理完每一个成功的查询之后,那么你的流需要修改,因为 Switch
丢失了查询流完成的信息,但它也缺少关于 UI 的信息(错误的调度程序even) 说它的数据是否真的被处理过。
可能有其他方法可以实现它,但基本上您希望您的查询流完成 通过 Switch
生存 (目前忽略此事件)。例如,您可以将查询流转换为具有 n+1 个事件,并为完整的事件增加一个:
_performSearchSubject
.AsObservable()
.Select(_ =>
PerformQuery()
.Select(Data => new { Data, Complete = false})
.Concat(Observable.Return(new { Data = (string)null, Complete = true })))
您可以安全地申请.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext)
,但是您需要修改您的订阅:
.Subscribe(data => {
if (data.Complete) DataArrivedForPositions(data.Data);
else PositionQueryCompleted()
}, PositionQueryError)