获取第一个 IObservable 事件而不阻塞需要它的 thread/task
Get first IObservable event without blocking the thread/task that wants it
我正在考虑使用 IObservable
在 c# async
方法中的请求-响应环境中获取响应并替换一些基于回调的旧代码,但我发现如果一个值被推送 (Subject.OnNext
) 到可观察对象,但 FirstAsync
尚未等待,那么 FirstAsync
永远不会收到该消息。
有没有直接的方法让它工作,而不需要第二次 task/thread 加同步?
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
await DoSendMessage(id, message);
return await ret; // Will sometimes miss the event/message
}
// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
...
subject.OnNext(message);
}
我不能在发送请求之前为 FirstAsync
添加 await
,因为那样会阻止发送请求。
await
将订阅 observable。您可以通过调用 ToTask
:
将订阅与 await
分开
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask();
await DoSendMessage(id, message);
return await ret;
}
我仔细看了看,有一个非常简单的解决方案,只需将热转换为冷可观察。将 Subject
替换为 ReplaySubject
。这是文章:http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html.
Here is the explanation:
The Replay extension method allows you take an existing observable
sequence and give it 'replay' semantics as per ReplaySubject. As a
reminder, the ReplaySubject will cache all values so that any late
subscribers will also get all of the values.
我正在考虑使用 IObservable
在 c# async
方法中的请求-响应环境中获取响应并替换一些基于回调的旧代码,但我发现如果一个值被推送 (Subject.OnNext
) 到可观察对象,但 FirstAsync
尚未等待,那么 FirstAsync
永远不会收到该消息。
有没有直接的方法让它工作,而不需要第二次 task/thread 加同步?
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
await DoSendMessage(id, message);
return await ret; // Will sometimes miss the event/message
}
// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
...
subject.OnNext(message);
}
我不能在发送请求之前为 FirstAsync
添加 await
,因为那样会阻止发送请求。
await
将订阅 observable。您可以通过调用 ToTask
:
await
分开
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask();
await DoSendMessage(id, message);
return await ret;
}
我仔细看了看,有一个非常简单的解决方案,只需将热转换为冷可观察。将 Subject
替换为 ReplaySubject
。这是文章:http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html.
Here is the explanation:
The Replay extension method allows you take an existing observable sequence and give it 'replay' semantics as per ReplaySubject. As a reminder, the ReplaySubject will cache all values so that any late subscribers will also get all of the values.