如何对每个订阅进行单元测试 Rx.Net
How to unit test Rx.Net foreach subscription
所有消息都应该发布到消息总线:
upstream.Get().ForEachAsync(async e => await _bus.Publish(e, cancellationToken));
我想进行单元测试以验证发布方法是否被正确调用:
[Theory, AutoMoqData]
public void Publish_ShouldPublishAllDataSources(
[Frozen]Mock<IBus> bus,
[Frozen]Mock<IUpstream> upstream,
SUT sut,
TestScheduler scheduler,
CancellationToken cancellationToken
)
{
{
Arrange();
Act();
Assert();
}
ITestableObservable<Event> inputObservable;
void Arrange()
{
inputObservable =
scheduler.CreateColdObservable(
OnNext(1L,new Event("e1")),
OnNext(1L,new Event("e2")),
OnNext(1L,new Event("e3"))
);
upstream
.Setup(c => c.Get())
.Returns(inputObservable);
}
void Act()
{
sut.Exercise(cancellationToken);
//Do sth with scheduller to fire all events
//I've tested these:
//scheduler.Start();
//scheduler.AdvanceTo(30000L);
}
void Assert()
{
inputObservable.ForEach(e =>
bus.Verify(b =>
b.Publish(e, cancellationToken)
));
}
}
但无论我尝试什么,测试都不会停止。
更新:
完整的生产代码如下:
class SUT
{
IUpstream _upstream;
public SUT(IUpstream upstream)
{
_upstream = upstream;
}
void Exercise(CancellationToken cancellationToken)
{
_upstream .Get()
.ForEachAsync(async
e => await _bus.Publish(e, cancellationToken));
}
}
interface IUpstream
{
IObservable<string> Get();
}
interface IBus
{
void Publish<T>(T,System.Threading.CancellationToken)
}
ITestableObservable<T>.Messages
应该用于断言而不是 ITestableObservable<T>
本身。
void Assert()
{
inputObservable
.Messages.ToList()
.ForEach(e =>
bus.Verify(b =>
b.Publish(e.Value.Value, cancellationToken)));
}
所以动作应该像下面这样:
void Act()
{
sut.Exercise(cancellationToken);
scheduler.Start();
//OR:
//scheduler.AdvanceBy(1L);
}
所有消息都应该发布到消息总线:
upstream.Get().ForEachAsync(async e => await _bus.Publish(e, cancellationToken));
我想进行单元测试以验证发布方法是否被正确调用:
[Theory, AutoMoqData]
public void Publish_ShouldPublishAllDataSources(
[Frozen]Mock<IBus> bus,
[Frozen]Mock<IUpstream> upstream,
SUT sut,
TestScheduler scheduler,
CancellationToken cancellationToken
)
{
{
Arrange();
Act();
Assert();
}
ITestableObservable<Event> inputObservable;
void Arrange()
{
inputObservable =
scheduler.CreateColdObservable(
OnNext(1L,new Event("e1")),
OnNext(1L,new Event("e2")),
OnNext(1L,new Event("e3"))
);
upstream
.Setup(c => c.Get())
.Returns(inputObservable);
}
void Act()
{
sut.Exercise(cancellationToken);
//Do sth with scheduller to fire all events
//I've tested these:
//scheduler.Start();
//scheduler.AdvanceTo(30000L);
}
void Assert()
{
inputObservable.ForEach(e =>
bus.Verify(b =>
b.Publish(e, cancellationToken)
));
}
}
但无论我尝试什么,测试都不会停止。
更新:
完整的生产代码如下:
class SUT
{
IUpstream _upstream;
public SUT(IUpstream upstream)
{
_upstream = upstream;
}
void Exercise(CancellationToken cancellationToken)
{
_upstream .Get()
.ForEachAsync(async
e => await _bus.Publish(e, cancellationToken));
}
}
interface IUpstream
{
IObservable<string> Get();
}
interface IBus
{
void Publish<T>(T,System.Threading.CancellationToken)
}
ITestableObservable<T>.Messages
应该用于断言而不是 ITestableObservable<T>
本身。
void Assert()
{
inputObservable
.Messages.ToList()
.ForEach(e =>
bus.Verify(b =>
b.Publish(e.Value.Value, cancellationToken)));
}
所以动作应该像下面这样:
void Act()
{
sut.Exercise(cancellationToken);
scheduler.Start();
//OR:
//scheduler.AdvanceBy(1L);
}