获取第一个 IObservable 事件而不阻塞需要它的 thread/task

Get first IObservable event without blocking the thread/task that wants it

我正在考虑使用 IObservable 在 c# async 方法中的请求-响应环境中获取响应并替换一些基于回调的旧代码,但我发现如果一个值被推送 (Subject.OnNext) 到可观察对象,但 FirstAsync 尚未等待,那么 FirstAsync 永远不会收到该消息。

有没有直接的方法让它工作,而不需要第二次 task/thread 加同步?

public async Task<ResponseMessage> Send(RequestMessage message)
{
    var id = Guid.NewGuid();
    var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
    await DoSendMessage(id, message);
    return await ret; // Will sometimes miss the event/message
}

// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
    ...
    subject.OnNext(message);
}

我不能在发送请求之前为 FirstAsync 添加 await,因为那样会阻止发送请求。

await 将订阅 observable。您可以通过调用 ToTask:

将订阅与 await 分开
public async Task<ResponseMessage> Send(RequestMessage message)
{
  var id = Guid.NewGuid();
  var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask();
  await DoSendMessage(id, message);
  return await ret;
}

我仔细看了看,有一个非常简单的解决方案,只需将热转换为冷可观察。将 Subject 替换为 ReplaySubject。这是文章:http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html.

Here is the explanation:

The Replay extension method allows you take an existing observable sequence and give it 'replay' semantics as per ReplaySubject. As a reminder, the ReplaySubject will cache all values so that any late subscribers will also get all of the values.