为什么我的 Akka.NET 流订阅者没有收到消息?
Why isn't my Akka.NET Stream Subscriber receiving messages?
我正在尝试编写一个简单的 Akka.NET 流。来源是 IActorRef
。接收器是 ISubscriber
。我正在使用 TestKit 将其实现为单元测试:
[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
using (var materializer = Sys.Materializer())
{
var probe = CreateTestProbe();
var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
var graph = source.ToMaterialized(sink, Keep.Both);
var (actor, publisher) = graph.Run(materializer);
subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));
var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
actor.Tell(evnt, ActorRefs.Nobody);
base.AwaitCondition(() =>
{
try
{
subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
return true;
}
catch(MockException)
{
return false;
}
});
}
}
对 OnSubscribe
方法的初始 Verify
调用顺利通过,但模拟订阅者从未收到对 OnNext
.
的调用
我做错了什么?
运行 为 netcoreapp2.0
。参考文献:
"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"
您的 ISubscriber<>
模拟不符合 Reactive Streams specification. It states, that in order to get any message after subscribing, subscriber must first communicate a demand using ISubscription.Request(long) 方法。
一般来说,如果您使用 Akka.Streams 测试套件,则不需要模拟订阅。只需下载 Akka.Streams.TestKit to get extension methods for Akka.Streams. This way you'll be able to build a fake subscriber simply by calling this.CreateManualSubscriberProbe<HandlerErrorEvent>();
inside your TestKit
class. It contains a dozens of methods 即可用于断言。
示例:
public class ExampleTest : TestKit
{
[Fact]
public void Select_should_map_output()
{
using (var materializer = Sys.Materializer())
{
// create test probe for subscriptions
var probe = this.CreateManualSubscriberProbe<int>();
// create flow materialized as publisher
var publisher = Source.From(new[] { 1, 2, 3 })
.Select(i => i + 1)
.RunWith(Sink.AsPublisher<int>(fanout: false), materializer);
// subscribe probe and receive subscription
publisher.Subscribe(probe);
var subscription = probe.ExpectSubscription();
// request number of elements to receive, here drain source utill the end
subscription.Request(4);
// validate assertions
probe.ExpectNext(2);
probe.ExpectNext(3);
probe.ExpectNext(4);
// since source had finite number of 3 elements, expect it to complete
probe.ExpectComplete();
}
}
}
我正在尝试编写一个简单的 Akka.NET 流。来源是 IActorRef
。接收器是 ISubscriber
。我正在使用 TestKit 将其实现为单元测试:
[Fact]
public void AkkaStreams_ActorSourcePublisherSink_Works()
{
using (var materializer = Sys.Materializer())
{
var probe = CreateTestProbe();
var source = Source.ActorRef<HandlerErrorEvent>(10, OverflowStrategy.DropNew);
var subscriber = new Mock<ISubscriber<HandlerErrorEvent>>();
var sink = Sink.FromSubscriber<HandlerErrorEvent>(subscriber.Object);
var graph = source.ToMaterialized(sink, Keep.Both);
var (actor, publisher) = graph.Run(materializer);
subscriber.Verify(s => s.OnSubscribe(It.IsAny<ISubscription>()));
var evnt = new HandlerErrorEvent("", HandlerResult.NotHandled);
actor.Tell(evnt, ActorRefs.Nobody);
base.AwaitCondition(() =>
{
try
{
subscriber.Verify(s => s.OnNext(It.IsAny<HandlerErrorEvent>()));
return true;
}
catch(MockException)
{
return false;
}
});
}
}
对 OnSubscribe
方法的初始 Verify
调用顺利通过,但模拟订阅者从未收到对 OnNext
.
我做错了什么?
运行 为 netcoreapp2.0
。参考文献:
"Akka.TestKit.Xunit2" Version="1.3.2"
"Microsoft.NET.Test.Sdk" Version="15.5.0"
"Moq" Version="4.8.0-rc1"
"xunit" Version="2.3.1"
"xunit.runner.visualstudio" Version="2.3.1"
"dotnet-xunit" Version="2.3.1"
您的 ISubscriber<>
模拟不符合 Reactive Streams specification. It states, that in order to get any message after subscribing, subscriber must first communicate a demand using ISubscription.Request(long) 方法。
一般来说,如果您使用 Akka.Streams 测试套件,则不需要模拟订阅。只需下载 Akka.Streams.TestKit to get extension methods for Akka.Streams. This way you'll be able to build a fake subscriber simply by calling this.CreateManualSubscriberProbe<HandlerErrorEvent>();
inside your TestKit
class. It contains a dozens of methods 即可用于断言。
示例:
public class ExampleTest : TestKit
{
[Fact]
public void Select_should_map_output()
{
using (var materializer = Sys.Materializer())
{
// create test probe for subscriptions
var probe = this.CreateManualSubscriberProbe<int>();
// create flow materialized as publisher
var publisher = Source.From(new[] { 1, 2, 3 })
.Select(i => i + 1)
.RunWith(Sink.AsPublisher<int>(fanout: false), materializer);
// subscribe probe and receive subscription
publisher.Subscribe(probe);
var subscription = probe.ExpectSubscription();
// request number of elements to receive, here drain source utill the end
subscription.Request(4);
// validate assertions
probe.ExpectNext(2);
probe.ExpectNext(3);
probe.ExpectNext(4);
// since source had finite number of 3 elements, expect it to complete
probe.ExpectComplete();
}
}
}