如何测试 Akka 集群中 DistributedPubSub 的发布者?
How can I test a publisher to a DistributedPubSub in Akka Cluster?
我有一个参与者,其唯一职责是将从外部界面(命令行、用户等)接收到的消息转发到适当的主题。我想测试它是否正确发布这些消息。
我需要创建一些虚拟订阅者,这些订阅者会期望消息发布到某个主题并对接收到的消息做出断言。
这是我试图实现的代码:
Messages.scala
case class Foo(foo: String)
InterfaceForwardingActor.scala
import akka.actor.{Actor, ActorLogging}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.Config
/** Actor responsible for forwarding stimuli external to the system.
* For instance, messages from the command-line interface or from a UI.
*
*/
class InterfaceForwardingActor extends Actor with ActorLogging {
import DistributedPubSubMediator.Publish
protected val mediator = DistributedPubSub(context.system).mediator
log.info(s"Hello from interface forwarder.")
final val topic = "info"
def receive = {
case foo: Foo => {
log.info("Forwarding a Foo message")
mediator ! Publish(topic, foo)
}
}
}
和测试代码
InterfaceForwardingActorTest.scala
import akka.actor.{ActorSystem, Props}
import akka.cluster.client.ClusterClient.Publish
import akka.cluster.pubsub.DistributedPubSub
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class InterfaceForwardingActorTest extends
TestKit(ActorSystem("InterfaceForwardingActorSpec")) with ImplicitSender with
WordSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"An InterfaceForwardingActor" must {
val interfaceForwardingActor = system.actorOf(Props[InterfaceForwardingActor])
val probe = TestProbe()
val mediator = DistributedPubSub(system).mediator
// subscribe the test probe to the "info" topic
mediator ! Publish("info", probe.ref)
"publish a Foo message" in {
val msg = Foo("test")
interfaceForwardingActor ! msg
probe.expectMsg(msg)
}
}
}
我发现订阅info
主题的probe
在默认的3秒超时时间内没有收到消息,断言失败。然而,有趣的是,我确实看到日志消息表明接口转发 actor 确实在转发 Foo 消息。
我在测试中做错了什么?
TestProbe
应该订阅测试代码中的主题:
mediator ! Subscribe("info", probe.ref)
而不是
mediator ! Publish("info", probe.ref)
分布式 pub-sub 的文档页面是 here,供参考。
我有一个参与者,其唯一职责是将从外部界面(命令行、用户等)接收到的消息转发到适当的主题。我想测试它是否正确发布这些消息。
我需要创建一些虚拟订阅者,这些订阅者会期望消息发布到某个主题并对接收到的消息做出断言。
这是我试图实现的代码:
Messages.scala
case class Foo(foo: String)
InterfaceForwardingActor.scala
import akka.actor.{Actor, ActorLogging}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import com.typesafe.config.Config
/** Actor responsible for forwarding stimuli external to the system.
* For instance, messages from the command-line interface or from a UI.
*
*/
class InterfaceForwardingActor extends Actor with ActorLogging {
import DistributedPubSubMediator.Publish
protected val mediator = DistributedPubSub(context.system).mediator
log.info(s"Hello from interface forwarder.")
final val topic = "info"
def receive = {
case foo: Foo => {
log.info("Forwarding a Foo message")
mediator ! Publish(topic, foo)
}
}
}
和测试代码
InterfaceForwardingActorTest.scala
import akka.actor.{ActorSystem, Props}
import akka.cluster.client.ClusterClient.Publish
import akka.cluster.pubsub.DistributedPubSub
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
class InterfaceForwardingActorTest extends
TestKit(ActorSystem("InterfaceForwardingActorSpec")) with ImplicitSender with
WordSpecLike with Matchers with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
"An InterfaceForwardingActor" must {
val interfaceForwardingActor = system.actorOf(Props[InterfaceForwardingActor])
val probe = TestProbe()
val mediator = DistributedPubSub(system).mediator
// subscribe the test probe to the "info" topic
mediator ! Publish("info", probe.ref)
"publish a Foo message" in {
val msg = Foo("test")
interfaceForwardingActor ! msg
probe.expectMsg(msg)
}
}
}
我发现订阅info
主题的probe
在默认的3秒超时时间内没有收到消息,断言失败。然而,有趣的是,我确实看到日志消息表明接口转发 actor 确实在转发 Foo 消息。
我在测试中做错了什么?
TestProbe
应该订阅测试代码中的主题:
mediator ! Subscribe("info", probe.ref)
而不是
mediator ! Publish("info", probe.ref)
分布式 pub-sub 的文档页面是 here,供参考。