控制台应用程序中 Observable 的 LastAsync() 死锁
Observable's LastAsync() deadlocks in console app
当使用 IObservable.LastAsync()
强制我的控制台应用程序等待使用 Flurl 的 API 调用的结果时,永远不会进行 API 调用并且主线程死锁并且永远不会returns 来自 LastAsync()
。我的目标是:
- 由于这是一个控制台应用程序,我无法真正“订阅”API 调用,因为这将允许主线程继续,可能导致它在 [=28= 之前退出】 通话完毕。所以我需要阻塞直到获取到值。
- API 调用应延迟到第一个订阅者请求值。
- 第二个和以后的订阅者不应引起另一个 API 调用,而是应该返回流中的最后一个值(这是使用
Replay(1)
的目标)
这是一个重现该问题的示例:
public static class Program
{
public static async Task Main(string[] args)
{
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.title)
.Replay(1);
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: {title}");
}
}
如何修改示例以确保满足上述所有 3 个要求?为什么我的例子会导致死锁?
Replay
returns“可连接”可观察,您需要对其调用 Connect()
方法以启动它。如果没有那个调用,它就不会订阅底层的可观察对象,也不会向自己的订阅者发送项目,所以这就是你遇到“死锁”的原因。
在这种情况下,您可以使用 RefCount()
扩展方法,而不是手动连接,该方法将在第一个订阅者自动连接,并在最后一个订阅者取消订阅时断开连接。所以:
public static async Task Main(string[] args) {
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.count)
.Replay(1)
.RefCount();
// makes request
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: {title}");
// does not make request, obtains from replay cache
title = await obs.LastAsync();
Console.WriteLine($"Title 2: {title}");
}
你也可以使用AutoConnect
方法:
.Replay(1)
.AutoConnect(1);
这将自动连接到第一个订阅者,但永远不会断开连接(在您的情况下无关紧要)。
当使用 IObservable.LastAsync()
强制我的控制台应用程序等待使用 Flurl 的 API 调用的结果时,永远不会进行 API 调用并且主线程死锁并且永远不会returns 来自 LastAsync()
。我的目标是:
- 由于这是一个控制台应用程序,我无法真正“订阅”API 调用,因为这将允许主线程继续,可能导致它在 [=28= 之前退出】 通话完毕。所以我需要阻塞直到获取到值。
- API 调用应延迟到第一个订阅者请求值。
- 第二个和以后的订阅者不应引起另一个 API 调用,而是应该返回流中的最后一个值(这是使用
Replay(1)
的目标)
这是一个重现该问题的示例:
public static class Program
{
public static async Task Main(string[] args)
{
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.title)
.Replay(1);
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: {title}");
}
}
如何修改示例以确保满足上述所有 3 个要求?为什么我的例子会导致死锁?
Replay
returns“可连接”可观察,您需要对其调用 Connect()
方法以启动它。如果没有那个调用,它就不会订阅底层的可观察对象,也不会向自己的订阅者发送项目,所以这就是你遇到“死锁”的原因。
在这种情况下,您可以使用 RefCount()
扩展方法,而不是手动连接,该方法将在第一个订阅者自动连接,并在最后一个订阅者取消订阅时断开连接。所以:
public static async Task Main(string[] args) {
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.count)
.Replay(1)
.RefCount();
// makes request
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: {title}");
// does not make request, obtains from replay cache
title = await obs.LastAsync();
Console.WriteLine($"Title 2: {title}");
}
你也可以使用AutoConnect
方法:
.Replay(1)
.AutoConnect(1);
这将自动连接到第一个订阅者,但永远不会断开连接(在您的情况下无关紧要)。