在反应管道中执行 TPL 代码并通过测试调度程序控制执行

Executing TPL code in a reactive pipeline and controlling execution via test scheduler

我正在努力弄清楚为什么以下测试不起作用:

[Fact]
public void repro()
{
    var scheduler = new TestScheduler();
    var count = 0;

    // this observable is a simplification of the system under test
    // I've just included it directly in the test for clarity
    // in reality it is NOT accessible from the test code - it is
    // an implementation detail of the system under test
    // but by passing in a TestScheduler to the sut, the test code
    // can theoretically control the execution of the pipeline
    // but per this question, that doesn't work when using FromAsync
    Observable
        .Return(1)
        .Select(i => Observable.FromAsync(Whatever))
        .Concat()
        .ObserveOn(scheduler)
        .Subscribe(_ => Interlocked.Increment(ref count));

    Assert.Equal(0, count);

    // this call initiates the observable pipeline, but does not
    // wait until the entire pipeline has been executed before
    // returning control to the caller
    // the question is: why? Rx knows I'm instigating an async task
    // as part of the pipeline (that's the point of the FromAsync
    // method), so why can't it still treat the pipeline atomically
    // when I call Start() on the scheduler?
    scheduler.Start();

    // count is still zero at this point
    Assert.Equal(1, count);
}

private async Task<Unit> Whatever()
{
    await Task.Delay(100);
    return Unit.Default;
}

我想要做的是 运行 一些异步代码(在上面由 Whatever() 表示)每当一个可观察的滴答声。重要的是,我希望这些呼叫排队。更重要的是,我希望能够通过使用 TestScheduler.

来控制管道的执行

似乎对 scheduler.Start() 的调用正在煽动 Whatever() 的执行,但它并没有等到它完成。如果我更改 Whatever() 使其同步:

private async Task<Unit> Whatever()
{
    //await Task.Delay(100);
    return Unit.Default;
}

然后测试通过了,但这当然违背了我想要达到的目的。我可以想象 TestScheduler 上有一个我可以等待的 StartAsync() 方法,但它不存在。

任何人都可以告诉我是否有办法让我发起反应管道的执行并等待其完成,即使它包含异步调用也是如此?

Noseratio 编写此测试的更优雅的 Rx 方式。您可以 await observables 来获取它们的最后一个值。结合 Count() 就变得微不足道了。

请注意,TestScheduler 在此示例中没有任何作用。

[Fact]
public async Task repro()
{
    var scheduler = new TestScheduler();

    var countObs = Observable
        .Return(1)
        .Select(i => Observable.FromAsync(Whatever))
        .Concat()
        //.ObserveOn(scheduler) // serves no purpose in this test
        .Count();

    Assert.Equal(0, count);
    //scheduler.Start(); // serves no purpose in this test.

    var count = await countObs;

    Assert.Equal(1, count);
}

让我把你的问题归结为它的要点:

Is there a way, using the TestScheduler, to execute a reactive pipeline and wait for its completion even when it contains asynchronous calls?

我应该提前警告你,这里没有快速简单的答案,没有方便的"trick"可以部署。

异步调用和调度程序

为了回答这个问题,我认为我们需要澄清一些要点。上述问题中的术语 "asynchronous call" 似乎专门用于指代具有 TaskTask<T> 签名的方法 - 即使用任务并行库 (TPL) 的方法 运行 异步。

这一点很重要,因为 Reactive Extensions (Rx) 采用不同的方法来处理异步操作。

在 Rx 中,并发的引入是通过调度程序管理的,调度程序是一种实现 IScheduler 接口的类型。任何引入并发性的操作 应该 提供一个可用的调度程序参数,以便调用者可以决定合适的调度程序。核心库严格遵守这一原则。因此,例如,Delay 允许指定调度程序,但 Where 不允许。

正如您从 the source 中看到的那样,IScheduler 提供了许多 Schedule 重载。需要并发的操作使用这些来安排工作的执行。确切地 如何 执行工作完全推迟到调度程序。这就是调度程序抽象的力量。

引入并发的 Rx 操作通常提供允许省略调度程序的重载,在这种情况下 select 一个合理的默认值。这一点很重要,因为如果你希望你的代码可以通过使用 TestScheduler 进行测试,你 必须 all 使用 TestScheduler 引入并发的操作。不允许这样做的流氓方法可能会破坏您的测试工作。

TPL 调度抽象

TPL 有自己的抽象来处理并发:TaskScheduler。这个想法非常相似。 You can read about it here..

两种抽象之间有两个非常重要的区别:

  • Rx 调度程序第一个 class 表示他们自己的时间概念 - Now 属性。 TPL 调度程序没有。
  • TPL 中自定义调度程序的使用要少得多,并且没有等效的最佳实践来提供重载以向引入并发的方法提供特定 TaskSchedulers(返回 TaskTask<T>)。绝大多数 Task 返回方法假定使用默认值 TaskScheduler,并且您无法选择工作地点 运行。

TestScheduler 的动机

使用 TestScheduler 的动机通常有两个:

  • 通过加快时间来消除 "wait" 操作的需要。
  • 检查事件是否在预期的时间点发生。

它的工作方式完全取决于调度程序有自己的时间概念这一事实。每次通过 IScheduler 安排操作时,我们指定 何时 它必须执行 - 尽快或在将来的特定时间执行。然后调度程序将工作排队等待执行,并在达到指定时间(根据调度程序本身)时执行它。

当您在 TestScheduler 上调用 Start 时,它的工作方式是清空执行时间等于或早于其当前概念 Now 的所有操作的队列 - 然后推进其计时到下一个预定的工作时间并重复直到其队列为空。

这允许巧妙的技巧,比如能够测试一个操作 永远不会 导致事件!如果使用实时,这将是一项具有挑战性的任务,但使用虚拟时间,这很容易 - 一旦调度程序队列完全为空,然后 TestScheduler 得出结论,不会再发生任何事件 - 因为如果它的队列中没有任何东西,没有什么可以安排进一步的任务。其实Startreturns此时恰恰就是这一点。为此,显然所有要测量的并发操作都必须安排在 TestScheduler.

自定义运算符粗心地选择了自己的调度程序而不允许覆盖该选择,或者使用自己的并发形式而没有时间概念的操作(例如基于 TPL 的调用)将使它变得困难,如果不是不可能的话,通过 TestScheduler.

来控制执行

如果您通过其他方式进行异步操作 运行,明智地使用 TestSchedulerAdvanceToAdvanceBy 方法可以让您与之协调并发的外部来源 - 但可实现的程度取决于该外部来源提供的控制。

在 TPL 的情况下,您确实知道任务何时完成 - 这确实允许在测试中使用等待和超时,尽管这些可能很丑陋。通过使用 TaskCompleteSources(TCS) 您可以模拟任务并使用 AdvanceTo 命中特定点并完成 TCS,但这里没有一种简单的方法。通常你不得不求助于丑陋的等待和超时,因为你对外部并发没有足够的控制。

Rx 通常是自由线程的,并尽可能避免引入并发。相反,Rx 调用链中的不同操作很可能需要不同类型的调度程序抽象。并非总是可以使用单个测试调度程序来模拟调用链。当然,我有理由使用多个 TestSchedulers 来模拟一些复杂的场景——例如使用 DispatcherSchedulerTaskScheduler 的链有时需要复杂的协调,这意味着您不能简单地将它们的操作序列化到一个 TestScheduler.

我参与的一些项目强制使用 Rx 实现 所有 并发,专门用于避免这些问题。这并不总是可行的,即使在这些情况下,TPL 的使用通常也是不可避免的。

一个特定的痛点

让许多测试人员摸不着头脑的 Rx 的一个特殊痛点是 TPL -> Rx 系列转换引入了并发性。例如ToObservableSelectMany 的重载接受 Task<T> 等不提供带有调度程序的重载并阴险地迫使您离开 TestScheduler 线程,即使使用 TCS 进行模拟也是如此。对于仅在测试中造成的所有痛苦,我认为这是一个错误。您可以阅读所有相关内容 here - dig through and you'll find Dave Sexton's proposed fix, which provides an overload for specifying a scheduler, and is under consideration for inclusion. You may want to look into that pull request.

一个潜在的解决方法

如果您可以编辑代码以使用它,则以下辅助方法可能会有用。它将任务转换为将 运行 在 TestScheduler 上并在正确的虚拟时间完成的可观察对象。

它在负责收集任务结果的 TestScheduler 上安排工作 - 在我们声明任务应该完成的虚拟时间。工作本身会阻塞,直到任务结果可用 - 允许 TPL 任务 运行 无论花费多长时间,或者直到实际指定的时间已经过去,在这种情况下会抛出 TimeoutException

阻止工作的效果意味着 TestScheduler 不会将其虚拟时间提前到超过任务的预期虚拟完成时间,直到任务 实际 完全的。这样,Rx 链的其余部分可以 运行 全速虚拟时间,我们只等待 TPL 任务,在任务完成虚拟时间暂停链的其余部分。

重要的是,安排在基于任务的操作的虚拟开始时间和任务的规定结束虚拟时间之间的其他并发 Rx 操作 运行 不会被阻塞,它们的虚拟完成时间也不会受到影响。

因此,将 duration 设置为您希望任务显示已花费的虚拟时间长度。然后将在任务启动时的虚拟时间加上指定的持续时间收集结果。

timeout 设置为您允许任务完成的实际时间。如果时间更长,则抛出超时异常:

public static IObservable<T> ToTestScheduledObseravble<T>(
    this Task<T> task,
    TestScheduler scheduler,
    TimeSpan duration,
    TimeSpan? timeout = null)
{   

    timeout = timeout ?? TimeSpan.FromSeconds(100);
    var subject = Subject.Synchronize(new AsyncSubject<T>(), scheduler);              

    scheduler.Schedule<Task<T>>(task, duration,
        (s, t) => {
            if (!task.Wait(timeout.Value))
            {           
                subject.OnError(
                    new TimeoutException(
                    "Task duration too long"));                        
            }
            else
            {
                switch (task.Status)
                {
                    case TaskStatus.RanToCompletion:
                        subject.OnNext(task.Result);
                        subject.OnCompleted();
                        break;
                    case TaskStatus.Faulted:
                        subject.OnError(task.Exception.InnerException);
                        break;
                    case TaskStatus.Canceled:
                        subject.OnError(new TaskCanceledException(task));
                        break;
                }
            }

            return Disposable.Empty;
        });

    return subject.AsObservable();
}

在你的代码中的用法是这样的,你的断言将通过:

Observable
    .Return(1)
    .Select(i => Whatever().ToTestScheduledObseravble(
        scheduler, TimeSpan.FromSeconds(1)))
    .Concat()
    .Subscribe(_ => Interlocked.Increment(ref count));

结论

总而言之,您没有错过任何方便的技巧。您需要考虑 Rx 的工作方式,以及 TPL 的工作方式,并决定是否:

  • 您避免混合 TPL 和 Rx
  • 你模拟 TPL 和 Rx 之间的接口(使用 TCS 或类似的),所以你独立测试每个
  • 你忍受丑陋的等待和超时并完全放弃 TestScheduler
  • 您将丑陋的等待和超时与 TestScheduler 混合在一起,以便对您的测试进行一些控制。

正如 James 上面提到的,您不能像现在这样混合使用并发模型。您使用 TestScheduler 从 Rx 中删除并发性,但从未真正通过 Rx 引入并发性。但是,您确实引入了 TPL 的并发性(即 Task.Delay(100)。这里实际上将 运行 在任务池线程上异步进行。因此您的同步测试将在任务完成之前完成。

你可以改成这样

[Fact]
public void repro()
{
    var scheduler = new TestScheduler();
    var count = 0;

    // this observable is a simplification of the system under test
    // I've just included it directly in the test for clarity
    // in reality it is NOT accessible from the test code - it is
    // an implementation detail of the system under test
    // but by passing in a TestScheduler to the sut, the test code
    // can theoretically control the execution of the pipeline
    // but per this question, that doesn't work when using FromAsync
    Observable
        .Return(1)
        .Select(_ => Observable.FromAsync(()=>Whatever(scheduler)))
        .Concat()
        .ObserveOn(scheduler)
        .Subscribe(_ => Interlocked.Increment(ref count));

    Assert.Equal(0, count);

    // this call initiates the observable pipeline, but does not
    // wait until the entire pipeline has been executed before
    // returning control to the caller
    // the question is: why? Rx knows I'm instigating an async task
    // as part of the pipeline (that's the point of the FromAsync
    // method), so why can't it still treat the pipeline atomically
    // when I call Start() on the scheduler?
    scheduler.Start();

    // count is still zero at this point
    Assert.Equal(1, count);
}

private async Task<Unit> Whatever(IScheduler scheduler)
{
    return await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler).Select(_=>Unit.Default).ToTask();
}

或者,您需要将 Whatever 方法放在一个接口后面,您可以模拟该接口进行测试。在这种情况下,您只需要 Stub/Mock/Double return 上面的代码,即 return await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler).Select(_=>Unit.Default).ToTask();