Take(1) from a Select() on a hot observable
Take(1) from a Select() on a hot observable
我想做的是拥有一个热可观察对象,然后通过 Select.
从中导出另一个可观察对象
接下来我想使用 await Take(1)
从派生的可观察对象中获取单个值,然后订阅它。
int i = 1;
var o1 = Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(x => { i++;return i; })
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"emit {x}"));
var o2 = o1.Select(x => x + 5);
await o2.Take(1);
Console.ReadLine();
using (o2.Subscribe(x =>
{
Console.WriteLine($"output {x}");
}))
{
Console.WriteLine($"subscrbied");
Console.ReadLine();
}
Console.ReadLine();
但是,我看到的是 await Take(1)
之后,可观察对象不再“有效”(不再打印“emit x”)。
这是为什么?
编辑
有趣的是,如果我添加一个 Task.Delay
它会起作用:
var o2 = o1.Select(x => x + 5);
await o2.Take(1);
await Task.Delay(1);
Console.ReadLine();
组合 .Publish().RefCount()
可能有点难用。有时订阅者归零,查询无法重新订阅。
但是,在这种情况下,似乎存在我尚未完全弄清楚的竞争条件。
以下是使您的代码正常工作的方法:
await o2.Take(1).ObserveOn(Scheduler.Default);
添加 ObserveOn
使其可以按照您期望的方式运行。
您对 await Task.Delay(1);
的使用也起到了同样的作用。但是为什么我还是很困惑。
最后,忽略让它工作的拼凑,它似乎工作的唯一原因是您正在使用外部状态 int i = 1;
。您可以删除 .Publish().RefCount()
,它仍会按预期工作。你应该避免这种外部状态,如果你确实使用它,你应该使用 Interlocked.Increment(ref i)
而不是 i++
.
根据我们的讨论,这里有另一种获取所需内容的方法:
var subject = new ReplaySubject<long>(1);
var source = Observable.Interval(TimeSpan.FromSeconds(1.0));
var subscription = source.Subscribe(subject);
Thread.Sleep(TimeSpan.FromSeconds(4.5));
var z = await subject.Take(1);
Console.WriteLine($"0:{z}z");
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"1:{x}x"));
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"2:{x}x"));
我想做的是拥有一个热可观察对象,然后通过 Select.
从中导出另一个可观察对象接下来我想使用 await Take(1)
从派生的可观察对象中获取单个值,然后订阅它。
int i = 1;
var o1 = Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(x => { i++;return i; })
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"emit {x}"));
var o2 = o1.Select(x => x + 5);
await o2.Take(1);
Console.ReadLine();
using (o2.Subscribe(x =>
{
Console.WriteLine($"output {x}");
}))
{
Console.WriteLine($"subscrbied");
Console.ReadLine();
}
Console.ReadLine();
但是,我看到的是 await Take(1)
之后,可观察对象不再“有效”(不再打印“emit x”)。
这是为什么?
编辑
有趣的是,如果我添加一个 Task.Delay
它会起作用:
var o2 = o1.Select(x => x + 5);
await o2.Take(1);
await Task.Delay(1);
Console.ReadLine();
组合 .Publish().RefCount()
可能有点难用。有时订阅者归零,查询无法重新订阅。
但是,在这种情况下,似乎存在我尚未完全弄清楚的竞争条件。
以下是使您的代码正常工作的方法:
await o2.Take(1).ObserveOn(Scheduler.Default);
添加 ObserveOn
使其可以按照您期望的方式运行。
您对 await Task.Delay(1);
的使用也起到了同样的作用。但是为什么我还是很困惑。
最后,忽略让它工作的拼凑,它似乎工作的唯一原因是您正在使用外部状态 int i = 1;
。您可以删除 .Publish().RefCount()
,它仍会按预期工作。你应该避免这种外部状态,如果你确实使用它,你应该使用 Interlocked.Increment(ref i)
而不是 i++
.
根据我们的讨论,这里有另一种获取所需内容的方法:
var subject = new ReplaySubject<long>(1);
var source = Observable.Interval(TimeSpan.FromSeconds(1.0));
var subscription = source.Subscribe(subject);
Thread.Sleep(TimeSpan.FromSeconds(4.5));
var z = await subject.Take(1);
Console.WriteLine($"0:{z}z");
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"1:{x}x"));
Thread.Sleep(TimeSpan.FromSeconds(2.5));
subject.Take(5).Subscribe(x => Console.WriteLine($"2:{x}x"));