如何防止 Rx 测试挂起?

How do I prevent by Rx test from hanging?

我正在使用下面的简化测试用例重现我的 Rx 问题。下面的测试挂起。我确信这是我所缺少的一个小但基本的东西,但我不能把它放在上面。

   public class Service
   {
        private ISubject<double> _subject = new Subject<double>();
        public void Reset()
        {
            _subject.OnNext(0.0);
        }

        public IObservable<double> GetProgress()
        {
            return _subject;
        }
    }

    public class ObTest
    {
        [Fact]
        private async Task SimpleTest()
        {
            var service = new Service();

            var result = service.GetProgress().Take(1);

            var task = Task.Run(async () =>
            {
                service.Reset();
            });

            await result;
        }
    }

更新

我上面的尝试是将问题稍微简化并理解它。在我的例子中,GetProgress() 是发布下载进度的各种 Observables 的合并,其中之一 ObservablesSubject<double>,每次有人调用时都会发布 0删除下载的方法。

EnigmativityTheodor Zoulias 确定的竞争条件可能(??)在现实生活中发生。我显示一个试图获取进度的视图,但是,快速手指及时将其删除。

我需要多了解一点的是,如果再次开始下载(现在订阅已经发生,通过显示一个已经订阅的视图)并且有人再次删除它。

   public class Service
   {
        private ISubject<double> _deleteSubject = new Subject<double>();
        public void Reset()
        {
            _deleteSubject.OnNext(0.0);
        }

        public IObservable<double> GetProgress()
        {
            return _deleteSubject.Merge(downloadProgress);
        }
    }

您的代码没有挂起。它正在等待一个有时永远得不到值的可观察对象。

您有竞争条件。

Task.Run 有时会在 await result 创建对 observable 的订阅之前执行完毕 - 因此它永远看不到值。

试试这个代码:

private async Task SimpleTest()
{
    var service = new Service();

    var result = service.GetProgress().Take(1);

    var awaiter = result.GetAwaiter();

    var task = Task.Run(() =>
    {
        service.Reset();
    });

    await awaiter;
}

await result 行创建了对可观察对象的订阅。问题是通知 _subject.OnNext(0.0) 可能会在此订阅之前发生,在这种情况下,该值将在未被观察到的情况下传递,并且 await result 将继续永远等待通知。在这个特定的例子中,通知总是被错过,至少在我的电脑上是这样,因为订阅延迟了大约 30 毫秒(用 Stopwatch 测量),这比重置服务的任务所需的时间长完成,可能是因为 JITer 必须加载和编译一些与 RX 相关的程序集。当我通过在示例 运行 之前调用 new Subject<int>().FirstAsync().Subscribe() 进行热身时,情况发生了变化。在那种情况下,几乎总是会观察到通知,从而避免挂起。

我能想到两个解决这个问题的可靠方法。

  1. Enigmativity 建议的 ,在启动重置服务的任务之前创建可等待的订阅。这可以通过 GetAwaiterToTask.

  2. 来完成
  3. 使用 ReplaySubject<T> 而不是普通香草 Subject<T>

Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed and future observers, subject to buffer trimming policies.

ReplaySubject 将缓存该值,以便将来的订阅可以观察到它,从而消除竞争条件。您可以使用 1 的 bufferSize 初始化它以最小化缓冲区的内存占用。