使用 TestSchedulers、Rx 和 BlockingCollection 进行死锁测试

Deadlock testing with TestSchedulers, Rx and BlockingCollection

我有以下 class,它基本上订阅了一个 int 可观察值并将该值乘以 2。出于现实目的,我添加了一个 Thread.Sleep 到模拟一个繁重的处理。

public class WorkingClass
{
    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);

    public WorkingClass(IObservable<int> rawValues)
    {
        rawValues.Subscribe(x => _collection.Add(x));
    }

    public IObservable<int> ProcessedValues()
    {
        return Observable.Create<int>(observer =>
        {
            while (true)
            {
                int value;

                try
                {
                    value = _collection.Take();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                    break;
                }

                Thread.Sleep(1000); //Simulate long work
                observer.OnNext(value * 2);
            }

            return Disposable.Empty;
        });
    }
}

我在测试它时遇到问题,在下面的测试中我只想断言如果源流发出值 1,SUT 将发出值 2:

[Test]
public void SimpleTest()
{
    var sourceValuesScheduler = new TestScheduler();
    var newThreadScheduler = new TestScheduler();

    var source = sourceValuesScheduler.CreateHotObservable(
         new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));

    var sut = new WorkingClass(source);

    var observer = sourceValuesScheduler.CreateObserver<int>();

    sut.ProcessedValues()
        .SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread
        .Subscribe(observer);

    sourceValuesScheduler.AdvanceTo(1000);

    observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
}

如果我 运行 此测试断言失败,因为 newThreadScheduler 从未启动,因此从未创建 ProcessedValues observable。如果我这样做:

 sourceValuesScheduler.AdvanceTo(1000);
 newThreadScheduler.AdvanceTo(1000); 

它也不起作用,因为 newThreadScheduler 使用与 sourceValuesScheduler 相同的线程,因此测试将在处理后的值发出后立即挂起,在行:

value = _collection.Take();

有没有办法让多个 TestScheduler 运行在不同的线程上运行?否则我怎么能像这样测试 classes?

Take() 会一直阻塞,直到有一个项目要从 BlockingCollection<int> 中删除或者您在其上调用 CompleteAdding()

鉴于您当前的实现,您订阅 ProcessedValues() 并执行 while 循环的线程将永远不会完成。

您应该在单独的线程上使用 BlockingCollection<int>。例如,您可以在调用 ProcessedValues() 时创建一个消耗 Task 。考虑以下实现,它也处理 BlockingCollection<int>:

public sealed class WorkingClass : IDisposable
{
    private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
    private List<Task> _consumerTasks = new List<Task>();

    public WorkingClass(IObservable<int> rawValues)
    {
        rawValues.Subscribe(x => _collection.Add(x));
    }

    public IObservable<int> ProcessedValues()
    {
        return Observable.Create<int>(observer =>
        {
            _consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));
            return Disposable.Empty;
        });
    }

    private void Consume(IObserver<int> observer)
    {
        try
        {
            foreach (int value in _collection.GetConsumingEnumerable())
            {
                Thread.Sleep(1000); //Simulate long work
                observer.OnNext(value * 2);
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
        }
    }

    public void Dispose()
    {
        _collection.CompleteAdding();
        Task.WaitAll(_consumerTasks.ToArray());
        _collection.Dispose();
    }
}

可以使用如下代码进行测试:

var sourceValuesScheduler = new TestScheduler();

var source = sourceValuesScheduler.CreateHotObservable(
    new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));

var observer = sourceValuesScheduler.CreateObserver<int>();

using (var sut = new WorkingClass(source))
{
    sourceValuesScheduler.AdvanceTo(1000); //add to collection
    sut.ProcessedValues().Subscribe(observer); //consume
} //...and wait until the loop exists

observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));