RX IObserver 订阅时间

RX IObserver subscription timing

我是 .net 中 Rx 的新手,但已经开始使用它并成功完成了一些网络通信

非常 简化示例:

    IObservable<Result> SendRequest(Request request)
    {
        return Observable.Create<Result>(observer =>
        {
            NetworkComms.SendReqeust(request,
                result =>
                {
                    observer.OnNext(result);
                    observer.OnCompleted();
                });
            return Disposable.Empty;
        }).SubscribeOn(_commsScheduler);
    }

我遇到的问题是命令 (NetworkComms.SendRequest) 在调用者订阅返回的 IObservable 后才真正发送。在某些情况下,请求是一劳永逸的命令,结果毫无意义,因此调用者实际订阅返回的 IObservable 毫无意义。

我需要的功能是:

  1. 命令立即发送,即使调用者从未订阅 IObservable
  2. 如果当客户端订阅时,即使他们晚订阅也会得到结果
  3. 命令只发送一次,但所有订阅都应该得到相同的结果。

我尝试使用 .Replay().RefCount() 执行此操作并在返回 IObservable 之前在内部执行 Subscribe() 。这几乎可以工作,但是如果客户端在 之后订阅 收到的结果(因此在完成自动处理序列之后),它会导致再次调用订阅代码,第二次发送命令。

是否有可以为我处理这种情况的简单 Rx 扩展,或者我需要自己动手?

您似乎想使用 AsyncSubject<T>:

IObservable<Result> SendRequest(Request request)
{
    var subject = new AsyncSubject<Result>();
    NetworkComms.SendReqeust(request, result =>
    {
        subject.OnNext(result);
        subject.OnCompleted();
    });
    return subject.AsObservable();
}

我要补充一点,您想要的界面有点奇怪。如果语义是 "The moment I call this method, a request is made, and anybody who wants to can get the response at a later time," 那么考虑只使用 Task<Result> 而不是 IObservable<Result>.

Task<Result> SendRequestAsync(Request request)
{
    var tcs = new TaskCompletionSource<Result>();
    NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
    return tcs.Task;
}

此外,考虑将此 Task<Result> SendRequestAsync(Request request) 方法直接放在您的 NetworkComms class.