Publish 和 SelectMany 在此查询中做了什么?
What are Publish and SelectMany doing in this query?
我有一个(或两个)反应式方法的新问题。
在我的场景中,我需要一个可观察的序列,该序列能够在第一个任务未完成时抑制其他发出的任务,并以如下形式结束:
Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => Observable.FromAsync(async () =>
{
await Task.Delay(1000);
// Simulating long running task
Console.WriteLine(x);
}))
.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())
.Subscribe();
我试过 Google 但我真的无法解释一些事情:
- 首先,它是如何工作的?
- 阻止可观察对象到达订阅的 Reactive 序列到底是什么?
Replay
到底做了什么?在这种情况下 Replay
不应该重播任务吗?或者我不知道。
任何人都可以详细解释该 Reactive 查询中的每个步骤吗?
Publish
与那种选择器有什么关系。 Replay
在该查询中的表现如何?如果无论如何只发射一个元素,为什么我需要在 FirstAsync
上调用 SelectMany
。
.SelectMany(c => c)
是 flatten/merge 嵌套序列的惯用方式。您可以将其替换为 .Merge()
,查询的行为将相同。
Publish
运算符,当与 Func<IObservable<TSource>, IObservable<TResult>>
参数一起使用时,订阅它所链接的查询,然后保持订阅状态,直到 lambda 生成的序列完成。因此,在您的情况下,通过将内部序列 x.FirstAsync().SelectMany(c => c).Replay()
包装在 Publish
中,您可以延迟取消订阅链式序列(Interval
+Select
+FromAsync
) 直到内部序列完成。内部序列永远不会完成,所以链式序列永远每秒产生一个冷 IObservable<Unit>
子序列。您可以通过在 Publish
:
之前拦截 Do
运算符来观察这种情况的发生
.Do(x => Console.WriteLine($"New subsequence: {x.GetType().Name}"))
Replay
运算符类似于Publish
,区别在于Replay
对过去的通知有记忆,而Publish
没有任何记忆。我猜您的意图是附加 Repeat
而不是 Replay
。不带参数的 Replay
会产生一个“可连接”的可观察对象,它不会自动订阅链式序列。您必须手动 Connect
它,或者将 RefCount
运算符附加到它。在您的情况下,您什么都不做,因此生成的序列永远不会发出任何东西,也永远不会完成。这是一个令人讨厌的 dead-lock 情况。
我有一个(或两个)反应式方法的新问题。 在我的场景中,我需要一个可观察的序列,该序列能够在第一个任务未完成时抑制其他发出的任务,并以如下形式结束:
Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => Observable.FromAsync(async () =>
{
await Task.Delay(1000);
// Simulating long running task
Console.WriteLine(x);
}))
.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())
.Subscribe();
我试过 Google 但我真的无法解释一些事情:
- 首先,它是如何工作的?
- 阻止可观察对象到达订阅的 Reactive 序列到底是什么?
Replay
到底做了什么?在这种情况下Replay
不应该重播任务吗?或者我不知道。 任何人都可以详细解释该 Reactive 查询中的每个步骤吗?Publish
与那种选择器有什么关系。Replay
在该查询中的表现如何?如果无论如何只发射一个元素,为什么我需要在FirstAsync
上调用SelectMany
。
.SelectMany(c => c)
是 flatten/merge 嵌套序列的惯用方式。您可以将其替换为.Merge()
,查询的行为将相同。
之前拦截Publish
运算符,当与Func<IObservable<TSource>, IObservable<TResult>>
参数一起使用时,订阅它所链接的查询,然后保持订阅状态,直到 lambda 生成的序列完成。因此,在您的情况下,通过将内部序列x.FirstAsync().SelectMany(c => c).Replay()
包装在Publish
中,您可以延迟取消订阅链式序列(Interval
+Select
+FromAsync
) 直到内部序列完成。内部序列永远不会完成,所以链式序列永远每秒产生一个冷IObservable<Unit>
子序列。您可以通过在Publish
:Do
运算符来观察这种情况的发生
.Do(x => Console.WriteLine($"New subsequence: {x.GetType().Name}"))
Replay
运算符类似于Publish
,区别在于Replay
对过去的通知有记忆,而Publish
没有任何记忆。我猜您的意图是附加Repeat
而不是Replay
。不带参数的Replay
会产生一个“可连接”的可观察对象,它不会自动订阅链式序列。您必须手动Connect
它,或者将RefCount
运算符附加到它。在您的情况下,您什么都不做,因此生成的序列永远不会发出任何东西,也永远不会完成。这是一个令人讨厌的 dead-lock 情况。