是否可以等待 Scala 中的第二次响应
Is it possible to await for second response in Scala
假设我有演员 A。
A 期望收到消息,一旦收到消息,它就会发回两条消息。
A extends Actor {
def receive: Receive = {
case M1 =>
context.sender ! M2
context.sender ! M3
}
}
在演员 A 中,我想发送一条消息,然后等待两个响应。
我知道以
这样的方式进行一次回应很容易
val task = A ? M1
Await.result(task, timeout)
但我不确定是否可以使用两个连续的消息。
发送两个单独的消息很重要,因为我只需要在另一个地方等待其中的第一个。
向发件人return发送一个包含 M2 和 M3 的元组如何?
import akka.pattern.ask
import akka.actor.{Props, ActorSystem, Actor}
import akka.util.Timeout
import com.test.A.{M1, M2, M3}
import scala.concurrent.Await
import scala.concurrent.duration._
object Test extends App {
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("test-system")
val actor = system.actorOf(Props[A], name = "a-actor")
val future = actor ? M1
val await = Await.result(future, Duration.Inf)
println(await)
}
class A extends Actor {
override def receive: Receive = {
case M1 => sender() ! (M2, M3)
}
}
object A {
case object M1
case object M2
case object M3
}
运行 这将导致:
(M2,M3)
在确实需要等待两条消息的情况下,可以通过引入一个中间Actor来解决这个问题。
这位演员看起来像这样:
class AggregationActor(aActor: ActorRef) extends Actor {
var awaitForM2: Option[M2] = None
var awaitForM3: Option[M3] = None
var originalSender: Option[ActorRef] = None
def receive: Receive = {
case M1 =>
// We save the sender
originalSender = Some(sender())
// Proxy the message
aActor ! M1
case M2 =>
awaitForM2 = Some(M2)
checkIfBothMessagesHaveArrived()
case M3 =>
awaitForM3 = Some(M3)
checkIfBothMessagesHaveArrived()
}
private def checkIfBothMessagesHaveArrived() = {
for {
m2 <- awaitForM2
m3 <- awaitForM3
s <- originalSender
} {
// Send as a tuple
s ! (m2, m3)
// Shutdown, our task is done
context.stop(self)
}
}
}
本质上它有内部状态并跟踪M1
和M2
响应是如何到达的。
你可以这样使用:
def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = {
val aggregationActor = system.actorOf(Props(new AggregationActor(aActor)))
(aggregationActor ? input).mapTo[(M2, M3)]
}
val system = ActorSystem("test")
val aActor = system.actorOf(Props(new A), name = "aActor")
// Awaiting the first message only:
val firstMessage = aActor ? M1
val first = Await.result(firstMessage, Duration.Inf)
// Awaiting both messages:
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system)
val both = Await.result(firstMessage, Duration.Inf)
假设我有演员 A。 A 期望收到消息,一旦收到消息,它就会发回两条消息。
A extends Actor {
def receive: Receive = {
case M1 =>
context.sender ! M2
context.sender ! M3
}
}
在演员 A 中,我想发送一条消息,然后等待两个响应。 我知道以
这样的方式进行一次回应很容易val task = A ? M1
Await.result(task, timeout)
但我不确定是否可以使用两个连续的消息。
发送两个单独的消息很重要,因为我只需要在另一个地方等待其中的第一个。
向发件人return发送一个包含 M2 和 M3 的元组如何?
import akka.pattern.ask
import akka.actor.{Props, ActorSystem, Actor}
import akka.util.Timeout
import com.test.A.{M1, M2, M3}
import scala.concurrent.Await
import scala.concurrent.duration._
object Test extends App {
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("test-system")
val actor = system.actorOf(Props[A], name = "a-actor")
val future = actor ? M1
val await = Await.result(future, Duration.Inf)
println(await)
}
class A extends Actor {
override def receive: Receive = {
case M1 => sender() ! (M2, M3)
}
}
object A {
case object M1
case object M2
case object M3
}
运行 这将导致:
(M2,M3)
在确实需要等待两条消息的情况下,可以通过引入一个中间Actor来解决这个问题。
这位演员看起来像这样:
class AggregationActor(aActor: ActorRef) extends Actor {
var awaitForM2: Option[M2] = None
var awaitForM3: Option[M3] = None
var originalSender: Option[ActorRef] = None
def receive: Receive = {
case M1 =>
// We save the sender
originalSender = Some(sender())
// Proxy the message
aActor ! M1
case M2 =>
awaitForM2 = Some(M2)
checkIfBothMessagesHaveArrived()
case M3 =>
awaitForM3 = Some(M3)
checkIfBothMessagesHaveArrived()
}
private def checkIfBothMessagesHaveArrived() = {
for {
m2 <- awaitForM2
m3 <- awaitForM3
s <- originalSender
} {
// Send as a tuple
s ! (m2, m3)
// Shutdown, our task is done
context.stop(self)
}
}
}
本质上它有内部状态并跟踪M1
和M2
响应是如何到达的。
你可以这样使用:
def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = {
val aggregationActor = system.actorOf(Props(new AggregationActor(aActor)))
(aggregationActor ? input).mapTo[(M2, M3)]
}
val system = ActorSystem("test")
val aActor = system.actorOf(Props(new A), name = "aActor")
// Awaiting the first message only:
val firstMessage = aActor ? M1
val first = Await.result(firstMessage, Duration.Inf)
// Awaiting both messages:
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system)
val both = Await.result(firstMessage, Duration.Inf)