RX IObserver 订阅时间
RX IObserver subscription timing
我是 .net 中 Rx 的新手,但已经开始使用它并成功完成了一些网络通信
非常 简化示例:
IObservable<Result> SendRequest(Request request)
{
return Observable.Create<Result>(observer =>
{
NetworkComms.SendReqeust(request,
result =>
{
observer.OnNext(result);
observer.OnCompleted();
});
return Disposable.Empty;
}).SubscribeOn(_commsScheduler);
}
我遇到的问题是命令 (NetworkComms.SendRequest) 在调用者订阅返回的 IObservable 后才真正发送。在某些情况下,请求是一劳永逸的命令,结果毫无意义,因此调用者实际订阅返回的 IObservable 毫无意义。
我需要的功能是:
- 命令立即发送,即使调用者从未订阅 IObservable
- 如果当客户端订阅时,即使他们晚订阅也会得到结果
- 命令只发送一次,但所有订阅都应该得到相同的结果。
我尝试使用 .Replay().RefCount() 执行此操作并在返回 IObservable 之前在内部执行 Subscribe() 。这几乎可以工作,但是如果客户端在 之后订阅 收到的结果(因此在完成自动处理序列之后),它会导致再次调用订阅代码,第二次发送命令。
是否有可以为我处理这种情况的简单 Rx 扩展,或者我需要自己动手?
您似乎想使用 AsyncSubject<T>
:
IObservable<Result> SendRequest(Request request)
{
var subject = new AsyncSubject<Result>();
NetworkComms.SendReqeust(request, result =>
{
subject.OnNext(result);
subject.OnCompleted();
});
return subject.AsObservable();
}
我要补充一点,您想要的界面有点奇怪。如果语义是 "The moment I call this method, a request is made, and anybody who wants to can get the response at a later time," 那么考虑只使用 Task<Result>
而不是 IObservable<Result>
.
Task<Result> SendRequestAsync(Request request)
{
var tcs = new TaskCompletionSource<Result>();
NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
return tcs.Task;
}
此外,考虑将此 Task<Result> SendRequestAsync(Request request)
方法直接放在您的 NetworkComms
class.
我是 .net 中 Rx 的新手,但已经开始使用它并成功完成了一些网络通信
非常 简化示例:
IObservable<Result> SendRequest(Request request)
{
return Observable.Create<Result>(observer =>
{
NetworkComms.SendReqeust(request,
result =>
{
observer.OnNext(result);
observer.OnCompleted();
});
return Disposable.Empty;
}).SubscribeOn(_commsScheduler);
}
我遇到的问题是命令 (NetworkComms.SendRequest) 在调用者订阅返回的 IObservable 后才真正发送。在某些情况下,请求是一劳永逸的命令,结果毫无意义,因此调用者实际订阅返回的 IObservable 毫无意义。
我需要的功能是:
- 命令立即发送,即使调用者从未订阅 IObservable
- 如果当客户端订阅时,即使他们晚订阅也会得到结果
- 命令只发送一次,但所有订阅都应该得到相同的结果。
我尝试使用 .Replay().RefCount() 执行此操作并在返回 IObservable 之前在内部执行 Subscribe() 。这几乎可以工作,但是如果客户端在 之后订阅 收到的结果(因此在完成自动处理序列之后),它会导致再次调用订阅代码,第二次发送命令。
是否有可以为我处理这种情况的简单 Rx 扩展,或者我需要自己动手?
您似乎想使用 AsyncSubject<T>
:
IObservable<Result> SendRequest(Request request)
{
var subject = new AsyncSubject<Result>();
NetworkComms.SendReqeust(request, result =>
{
subject.OnNext(result);
subject.OnCompleted();
});
return subject.AsObservable();
}
我要补充一点,您想要的界面有点奇怪。如果语义是 "The moment I call this method, a request is made, and anybody who wants to can get the response at a later time," 那么考虑只使用 Task<Result>
而不是 IObservable<Result>
.
Task<Result> SendRequestAsync(Request request)
{
var tcs = new TaskCompletionSource<Result>();
NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
return tcs.Task;
}
此外,考虑将此 Task<Result> SendRequestAsync(Request request)
方法直接放在您的 NetworkComms
class.