控制台应用程序中 Observable 的 LastAsync() 死锁

Observable's LastAsync() deadlocks in console app

当使用 IObservable.LastAsync() 强制我的控制台应用程序等待使用 Flurl 的 API 调用的结果时,永远不会进行 API 调用并且主线程死锁并且永远不会returns 来自 LastAsync()。我的目标是:

  1. 由于这是一个控制台应用程序,我无法真正“订阅”API 调用,因为这将允许主线程继续,可能导致它在 [=28= 之前退出】 通话完毕。所以我需要阻塞直到获取到值。
  2. API 调用应延迟到第一个订阅者请求值。
  3. 第二个和以后的订阅者不应引起另一个 API 调用,而是应该返回流中的最后一个值(这是使用 Replay(1) 的目标)

这是一个重现该问题的示例:

public static class Program
{
    public static async Task Main(string[] args)
    {
        var obs = Observable.Defer(() =>
                "https://api.publicapis.org"
                    .AppendPathSegment("entries")
                    .GetJsonAsync()
                    .ToObservable())
            .Select(x => x.title)
            .Replay(1);

        var title = await obs.LastAsync();
        Console.WriteLine($"Title 1: {title}");
    }
}

如何修改示例以确保满足上述所有 3 个要求?为什么我的例子会导致死锁?

Replay returns“可连接”可观察,您需要对其调用 Connect() 方法以启动它。如果没有那个调用,它就不会订阅底层的可观察对象,也不会向自己的订阅者发送项目,所以这就是你遇到“死锁”的原因。

在这种情况下,您可以使用 RefCount() 扩展方法,而不是手动连接,该方法将在第一个订阅者自动连接,并在最后一个订阅者取消订阅时断开连接。所以:

public static async Task Main(string[] args) {
    var obs = Observable.Defer(() =>
            "https://api.publicapis.org"
                .AppendPathSegment("entries")
                .GetJsonAsync()
                .ToObservable())
        .Select(x => x.count)
        .Replay(1)
        .RefCount();

    // makes request
    var title = await obs.LastAsync();
    Console.WriteLine($"Title 1: {title}");
    // does not make request, obtains from replay cache
    title = await obs.LastAsync();
    Console.WriteLine($"Title 2: {title}");
}

你也可以使用AutoConnect方法:

.Replay(1)
.AutoConnect(1);

这将自动连接到第一个订阅者,但永远不会断开连接(在您的情况下无关紧要)。