Take(1) from a Select() on a hot observable

Take(1) from a Select() on a hot observable

我想做的是拥有一个热可观察对象,然后通过 Select.

从中导出另一个可观察对象

接下来我想使用 await Take(1) 从派生的可观察对象中获取单个值,然后订阅它。

int i = 1;
var o1 = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(x => { i++;return i; })
    .Publish()
    .RefCount()
    .Do(x => Console.WriteLine($"emit {x}"));

var o2 = o1.Select(x => x + 5);

await o2.Take(1);
Console.ReadLine();

using (o2.Subscribe(x =>
 {
     Console.WriteLine($"output {x}");
 }))
{
    Console.WriteLine($"subscrbied");
    Console.ReadLine();
}
Console.ReadLine();

但是,我看到的是 await Take(1) 之后,可观察对象不再“有效”(不再打印“emit x”)。

这是为什么?

编辑

有趣的是,如果我添加一个 Task.Delay 它会起作用:

var o2 = o1.Select(x => x + 5);

await o2.Take(1);
await Task.Delay(1);
Console.ReadLine();

组合 .Publish().RefCount() 可能有点难用。有时订阅者归零,查询无法重新订阅。

但是,在这种情况下,似乎存在我尚未完全弄清楚的竞争条件。

以下是使您的代码正常工作的方法:

await o2.Take(1).ObserveOn(Scheduler.Default);

添加 ObserveOn 使其可以按照您期望的方式运行。

您对 await Task.Delay(1); 的使用也起到了同样的作用。但是为什么我还是很困惑。

最后,忽略让它工作的拼凑,它似乎工作的唯一原因是您正在使用外部状态 int i = 1;。您可以删除 .Publish().RefCount(),它仍会按预期工作。你应该避免这种外部状态,如果你确实使用它,你应该使用 Interlocked.Increment(ref i) 而不是 i++.


根据我们的讨论,这里有另一种获取所需内容的方法:

var subject = new ReplaySubject<long>(1);
var source = Observable.Interval(TimeSpan.FromSeconds(1.0));
var subscription = source.Subscribe(subject);
Thread.Sleep(TimeSpan.FromSeconds(4.5));
var z = await subject.Take(1);
Console.WriteLine($"0:{z}z");
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"1:{x}x"));
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"2:{x}x"));