将 futures 与 actor 消息混合时确保测试中的消息顺序
Ensure message order in test when mixing futures with actor messages
我正在测试一个使用基于异步未来的 actor API。当未来完成时,参与者使用管道模式向自己发送消息:
import akka.pattern.pipe
// ...
// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)
在我的测试中,我模拟了 API,所以我通过承诺控制未来的完成。但是,这与直接发送给参与者的其他消息交织在一起:
myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")
现在我想知道如何保证演员收到并
在我的测试中在消息A和C之间处理消息B,因为消息B实际上是在另一个线程中发送的,所以我无法控制顺序
演员的邮箱在其中接收消息。
我考虑了几种可能的解决方案:
在每条消息后休眠几毫秒以制作另一条消息
不太可能订购
等待演员确认每条消息,尽管
只有测试需要确认
直接发送消息B给actor模拟完成
未来并编写一个单独的测试以确保管道模式
被正确使用(如果演员愿意,上面的测试不会失败
不将结果消息通过管道传递给自身)
我不太喜欢这两个选项,但我倾向于使用最后一个
一。还有其他更好的方法可以在测试中强制执行特定的消息顺序吗?
澄清:问题不在于如何处理生产中可能以随机顺序接收消息这一事实。控制测试中的顺序对于确保参与者实际可以处理不同的消息顺序至关重要。
一个想法是在你的 actor 中定义一个标志,指示 actor 是否收到消息 B。当 actor 收到消息 C 时,如果标志为 false,则 actor 可以 stash 该消息 C,然后一旦 actor 收到消息 B 就将其取消。例如:
class MyActor extends Actor with Stash {
def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
case MakeApiCall =>
callExternalApi().mapTo[MessageB].pipeTo(self)
case m: MessageB if seenMsgC => // assume msg C has been stashed
unstashAll()
// ...do something with msg B
become(receiveBlock(true, seenMsgC)) // true, true
case m: MessageB if !seenMsgC =>
// ...do something with message B
become(receiveBlock(true, seenMsgC)) // true, false
case m: MessageC if seenMsgB =>
// ...do something with message C
context.become(receiveBlock(seenMsgB, true)) // true, true
case m: MessageC if !seenMsgB =>
stash()
context.become(receiveBlock(seenMsgB, true)) // false, true
case ...
}
def receive = receiveBlock(false, false)
}
如果订购消息如此重要,您应该使用 ask
(?
) 其中 returns Future
并将它们链接起来,即使您不希望收到任何回复演员。
在阅读了很多关于 akka 的内容之后,我终于找到了一个更好的解决方案:将 actor 邮箱替换为我可以在测试中观察到的邮箱。这样我就可以等到actor在我完成promise后收到新的消息。只有这样才能发送下一条消息。此 TestingMailbox
的代码在 post.
的末尾给出
更新: 在 Akka Typed 中,这可以通过 BehaviorInterceptor
非常优雅地实现。只需用自定义拦截器包装被测 Behavior
即可转发所有消息和信号,但让您观察它们。
下面给出untyped Akka的邮箱解决方案
演员可以这样配置:
actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))
我必须通过提供配置来确保 "testing-mailbox" 被 actor 系统识别:
class MyTest extends TestKit(ActorSystem("some name",
ConfigFactory.parseString("""{
testing-mailbox = {
mailbox-type = "my.package.TestingMailbox"
}
}""")))
with BeforeAndAfterAll // ... and so on
设置完成后,我可以像这样更改我的测试:
myActor ! Message("A")
val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
promiseFromApiCall.success(Message("B"))
Await.ready(nextMessage, 3.seconds)
myActor ! Message("C")
有了一点辅助方法,我什至可以这样写:
myActor ! Message("A")
receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
myActor ! Message("C")
这是我的自定义邮箱:
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config
import scala.concurrent.{Future, Promise}
object TestingMailbox {
val promisesByReceiver =
scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()
class MessageQueue extends UnboundedMailbox.MessageQueue {
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
}
}
def nextMessage(receiver: ActorRef): Future[Any] =
promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future
}
class TestingMailbox extends MailboxType
with ProducesMessageQueue[TestingMailbox.MessageQueue] {
import TestingMailbox._
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]) =
new MessageQueue()
}
我正在测试一个使用基于异步未来的 actor API。当未来完成时,参与者使用管道模式向自己发送消息:
import akka.pattern.pipe
// ...
// somewhere in the actor's receive method
futureBasedApi.doSomething().pipeTo(self)
在我的测试中,我模拟了 API,所以我通过承诺控制未来的完成。但是,这与直接发送给参与者的其他消息交织在一起:
myActor ! Message("A")
promiseFromApiCall.success(Message("B"))
myActor ! Message("C")
现在我想知道如何保证演员收到并 在我的测试中在消息A和C之间处理消息B,因为消息B实际上是在另一个线程中发送的,所以我无法控制顺序 演员的邮箱在其中接收消息。
我考虑了几种可能的解决方案:
在每条消息后休眠几毫秒以制作另一条消息 不太可能订购
等待演员确认每条消息,尽管 只有测试需要确认
直接发送消息B给actor模拟完成 未来并编写一个单独的测试以确保管道模式 被正确使用(如果演员愿意,上面的测试不会失败 不将结果消息通过管道传递给自身)
我不太喜欢这两个选项,但我倾向于使用最后一个 一。还有其他更好的方法可以在测试中强制执行特定的消息顺序吗?
澄清:问题不在于如何处理生产中可能以随机顺序接收消息这一事实。控制测试中的顺序对于确保参与者实际可以处理不同的消息顺序至关重要。
一个想法是在你的 actor 中定义一个标志,指示 actor 是否收到消息 B。当 actor 收到消息 C 时,如果标志为 false,则 actor 可以 stash 该消息 C,然后一旦 actor 收到消息 B 就将其取消。例如:
class MyActor extends Actor with Stash {
def receiveBlock(seenMsgB: Boolean, seenMsgC: Boolean): Receive = {
case MakeApiCall =>
callExternalApi().mapTo[MessageB].pipeTo(self)
case m: MessageB if seenMsgC => // assume msg C has been stashed
unstashAll()
// ...do something with msg B
become(receiveBlock(true, seenMsgC)) // true, true
case m: MessageB if !seenMsgC =>
// ...do something with message B
become(receiveBlock(true, seenMsgC)) // true, false
case m: MessageC if seenMsgB =>
// ...do something with message C
context.become(receiveBlock(seenMsgB, true)) // true, true
case m: MessageC if !seenMsgB =>
stash()
context.become(receiveBlock(seenMsgB, true)) // false, true
case ...
}
def receive = receiveBlock(false, false)
}
如果订购消息如此重要,您应该使用 ask
(?
) 其中 returns Future
并将它们链接起来,即使您不希望收到任何回复演员。
在阅读了很多关于 akka 的内容之后,我终于找到了一个更好的解决方案:将 actor 邮箱替换为我可以在测试中观察到的邮箱。这样我就可以等到actor在我完成promise后收到新的消息。只有这样才能发送下一条消息。此 TestingMailbox
的代码在 post.
更新: 在 Akka Typed 中,这可以通过 BehaviorInterceptor
非常优雅地实现。只需用自定义拦截器包装被测 Behavior
即可转发所有消息和信号,但让您观察它们。
下面给出untyped Akka的邮箱解决方案
演员可以这样配置:
actorUnderTest = system.actorOf(Props[MyActor]).withMailbox("testing-mailbox"))
我必须通过提供配置来确保 "testing-mailbox" 被 actor 系统识别:
class MyTest extends TestKit(ActorSystem("some name",
ConfigFactory.parseString("""{
testing-mailbox = {
mailbox-type = "my.package.TestingMailbox"
}
}""")))
with BeforeAndAfterAll // ... and so on
设置完成后,我可以像这样更改我的测试:
myActor ! Message("A")
val nextMessage = TestingMailbox.nextMessage(actorUnderTest)
promiseFromApiCall.success(Message("B"))
Await.ready(nextMessage, 3.seconds)
myActor ! Message("C")
有了一点辅助方法,我什至可以这样写:
myActor ! Message("A")
receiveMessageAfter { promiseFromApiCall.success(Message("B")) }
myActor ! Message("C")
这是我的自定义邮箱:
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import com.typesafe.config.Config
import scala.concurrent.{Future, Promise}
object TestingMailbox {
val promisesByReceiver =
scala.collection.concurrent.TrieMap[ActorRef, Promise[Any]]()
class MessageQueue extends UnboundedMailbox.MessageQueue {
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
promisesByReceiver.remove(receiver).foreach(_.success(handle.message))
}
}
def nextMessage(receiver: ActorRef): Future[Any] =
promisesByReceiver.getOrElseUpdate(receiver, Promise[Any]).future
}
class TestingMailbox extends MailboxType
with ProducesMessageQueue[TestingMailbox.MessageQueue] {
import TestingMailbox._
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef],
system: Option[ActorSystem]) =
new MessageQueue()
}