如何对每个订阅进行单元测试 Rx.Net

How to unit test Rx.Net foreach subscription

所有消息都应该发布到消息总线:

 upstream.Get().ForEachAsync(async e => await _bus.Publish(e, cancellationToken));

我想进行单元测试以验证发布方法是否被正确调用:

    [Theory, AutoMoqData]
    public void Publish_ShouldPublishAllDataSources(
        [Frozen]Mock<IBus> bus,
        [Frozen]Mock<IUpstream> upstream,
        SUT sut,
        TestScheduler scheduler,
        CancellationToken cancellationToken
        )
    {
        {
            Arrange();
            Act();
            Assert();
        }
        ITestableObservable<Event> inputObservable;

        void Arrange()
        {
            inputObservable =
                    scheduler.CreateColdObservable(
                        OnNext(1L,new Event("e1")),
                        OnNext(1L,new Event("e2")),
                        OnNext(1L,new Event("e3"))
                    );
            upstream
            .Setup(c => c.Get())
            .Returns(inputObservable);


        }

        void Act()
        {
            sut.Exercise(cancellationToken);
            //Do sth with scheduller to fire all events 
            //I've tested these:
            //scheduler.Start();
            //scheduler.AdvanceTo(30000L);
        }


        void Assert()
        {
            inputObservable.ForEach(e =>
                bus.Verify(b => 
                    b.Publish(e, cancellationToken)
                ));
        }
    }

但无论我尝试什么,测试都不会停止。

更新:

完整的生产代码如下:

class SUT 
{
     IUpstream _upstream;
     public SUT(IUpstream upstream)
     {
          _upstream = upstream;
     }
     void Exercise(CancellationToken cancellationToken)
     {
         _upstream .Get()
               .ForEachAsync(async 
                    e => await _bus.Publish(e, cancellationToken));
     }
}


interface IUpstream
{
    IObservable<string> Get();
}

interface IBus
{
    void Publish<T>(T,System.Threading.CancellationToken)
}

ITestableObservable<T>.Messages 应该用于断言而不是 ITestableObservable<T> 本身。

    void Assert()
    {
        inputObservable
                .Messages.ToList()
                .ForEach(e =>
                bus.Verify(b =>
                    b.Publish(e.Value.Value, cancellationToken)));
    }

所以动作应该像下面这样:

    void Act()
    {
        sut.Exercise(cancellationToken);
        scheduler.Start();
        //OR:
        //scheduler.AdvanceBy(1L);
    }