Publish 和 SelectMany 在此查询中做了什么?

What are Publish and SelectMany doing in this query?

我有一个(或两个)反应式方法的新问题。 在我的场景中,我需要一个可观察的序列,该序列能够在第一个任务未完成时抑制其他发出的任务,并以如下形式结束:

Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => Observable.FromAsync(async () =>
{
    await Task.Delay(1000);
    // Simulating long running task
    Console.WriteLine(x);
}))
.Publish(x => x.FirstAsync().SelectMany(c => c).Repeat())
.Subscribe();

我试过 Google 但我真的无法解释一些事情:

  1. 首先,它是如何工作的?
  2. 阻止可观察对象到达订阅的 Reactive 序列到底是什么? Replay 到底做了什么?在这种情况下 Replay 不应该重播任务吗?或者我不知道。 任何人都可以详细解释该 Reactive 查询中的每个步骤吗? Publish 与那种选择器有什么关系。 Replay 在该查询中的表现如何?如果无论如何只发射一个元素,为什么我需要在 FirstAsync 上调用 SelectMany
  1. .SelectMany(c => c) 是 flatten/merge 嵌套序列的惯用方式。您可以将其替换为 .Merge(),查询的行为将相同。

  2. Publish 运算符,当与 Func<IObservable<TSource>, IObservable<TResult>> 参数一起使用时,订阅它所链接的查询,然后保持订阅状态,直到 lambda 生成的序列完成。因此,在您的情况下,通过将内部序列 x.FirstAsync().SelectMany(c => c).Replay() 包装在 Publish 中,您可以延迟取消订阅链式序列(Interval+Select+FromAsync ) 直到内部序列完成。内部序列永远不会完成,所以链式序列永远每秒产生一个冷 IObservable<Unit> 子序列。您可以通过在 Publish:

    之前拦截 Do 运算符来观察这种情况的发生
.Do(x => Console.WriteLine($"New subsequence: {x.GetType().Name}"))
  1. Replay运算符类似于Publish,区别在于Replay对过去的通知有记忆,而Publish没有任何记忆。我猜您的意图是附加 Repeat 而不是 Replay。不带参数的 Replay 会产生一个“可连接”的可观察对象,它不会自动订阅链式序列。您必须手动 Connect 它,或者将 RefCount 运算符附加到它。在您的情况下,您什么都不做,因此生成的序列永远不会发出任何东西,也永远不会完成。这是一个令人讨厌的 dead-lock 情况。